Kafka消息队列
Kafka基础架构
Kafka是一个分布式的基于发布/订阅模式的消息队列
总体上分为 生产者Producer、服务端(集群)Broker、消费者Consumer
其中 Broker 包含三个重要概念:主题Topic、分区Partition、副本Replica
基本流程为:
- 生产者向一个主题发送消息,消费者订阅一个主题后能接收到该主题上的消息
- 一个主题的消息可以分布在多个Broker上,称为分区
- 为了保证高可用,分区的数据引入副本的概念,多个副本会尽量分布在不同的Broker上,并从中选出一个Leader与生产者、消费者通信,具体见后文
基本架构图
生产者基础架构
- 生产者调用
send(topic,value)
发送数据,经过拦截器和序列化器处理数据 - 到达分区器之后,根据分区规则确定分区,发送到该分区对应的缓存队列中(没有则创建)
- 当缓存队列里的数据大于等于发送批次大小(16K)或到达间隔时间时,唤醒
Sender线程
Sender
线程从缓冲队列读取数据,封装成InflightRequest 请求,通过Selector ,按照Broker节点id为单位批量发送给服务端- Broker接收到后应答ACK,若成功则在BufferPool清理对应数据的内存,失败则重试
分区规则:
- 如果指定了分区,则发送到该分区
- 自定义分区,可以自定义分区器,实现
Partitioner
接口,再配置上Class即可 - 如果没指定分区和自定义分区器,但指定了key,则发送到
key.hashcode() % partitions.size()
分区 - 如果都没指定,则按照
粘性分区策略
,即随机一个分区,之后的数据发送到该分区,直到发满一个批次(16KB)或已完成,再随机一个不同的分区
Broker基础架构
消费者基础架构
初始化过程
- 定位Coordinator:Consumer会向最近的Broker询问自己对应的 coordinator所在的Broker
- 消费者组每个consumer向coordinator发送join group请求,coordinator从中选出一个consumer作为leader,并把要消费的topic信息发送给该leader
- 该leader负责制定消费方案,并发送给coordinator,由它发送给组内的所有消费者执行
消费方案(消费分区分配策略)
举例:七个分区,消费组有3个
- Range(默认):均匀按序分配,前面的消费者多消费,如: (0,1,2) (3,4) (5,6)
- Round Robin:均匀轮询分配,如 (0,3,6) (1,4) (2,5)
- Sticky Strategy:粘性分配,随机均匀分配
消费过程
ZooKeeper在Kafka中的作用
- 参考自:博弈史密斯 和 JavaGuide
- Broker 注册 :在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到
/brokers/ids
下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去 - Topic 注册 : 在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:
/brokers/topics/my-topic/Partitions/0
、/brokers/topics/my-topic/Partitions/1
- 负载均衡 :上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
Kafka文件管理
文件存储
在server.properties
文件中配置log.dirs=D:\Kafka\kafka_2.13-3.1.0\data
,这里会存储消息数据
Kafka中的消息以磁盘文件的方式存储,topic作为逻辑分类,partition作为物理文件格式,文件夹格式为topic-partition
,如下👇
每个文件夹都代表一个partition的log数据,为了防止数据太大不好定位和操作,对其进行分片和索引,分为一个个的segment
,每个segment
最大1G,里面由.log
、.index
、.timeindex
三种文件,分别用于记录数据,稀疏索引、和时间戳索引
文件清除策略
默认情况下,与.timeindex
中最后一条记录的时间超过七天,则会触发文件清除策略
- 直接删除(默认)
- 压缩:对于相同key不同value,只保留最新版本
高效读写
-
Kafka本身分布式集群,采用分区技术,并行度高
-
读取时采用稀疏索引,可以快速定位要消费的数据
-
顺序读写磁盘
-
页缓存和零拷贝技术
具体参考:小林coding
高可用
-
每个主题(topic)都设置一定数量的分区(partitions),然后分布在多个节点(brokers)上,组成分布式的消息队列
-
每个分区都会有若干个副本(replica),分为leader副本和follower副本,它们通常不在同一个服务器节点上,保证了一台服务器宕机时,还能从没宕机的服务器上找到数据
-
生产者和消费者读写数据都在leader副本上进行,follower主动从leader副本同步数据,不开放follower是为了均衡数据一致性和系统复杂度
-
如何选取Leader?
-
Follower故障了怎么办?
- 临时踢出ISR
- 恢复后读取本地磁盘上次的HW,将高于HW的数据删除,重新同步
- 等到LEO赶上该Partition的HW之后再加入ISR
-
Leader故障了怎么办?
数据可靠
数据可靠,也就是服务器接收到数据至少一次:
- acks级别为-1:所有ISR中的副本都落盘后才返回应答,即使leader应答后挂了,副本也有正确数据
- 分区副本数和ISR最小副本数大于等于2:防止leader挂了,数据丢失
所以生产者经过如上配置后一定不会丢数据,消费者端丢失的情况,在处理完消息后手动提交offset可以解决,但需要处理重复消费问题。
重复消费
重复消费的出现分两种情况
重复落盘导致
由之前的生产者模型可以知道,一个Inflight Request失败时会进行重试,如果这个请求的消息已经持久化了,只是发送ACK时网络故障,那么导致的重试就会引起重复落盘,多出了一条重复数据!
解决方法
- 实现精准一次:幂等性 + 至少一次
- 幂等性用于保证不重复,其依靠的是消息由< pid, partition, seqNum >三元标识符唯一标识,但只限于单分区单会话不重复,重启后新开的会话还是有可能重复
- 于是引入Kafka事务机制,由生产者指定全局唯一的事务id来获取事务协调器所在的Broker,并由它分配pid,保证了多个会话间的pid一致,本质还是幂等性
offset提交导致
- 如果是自动提交offset:频率为5s一次,假设提交offset后2s consumer挂掉了,那这2s消费了的offset没有提交,重启后会再消费一次
- 如果是手动提交:提交offset时消费者进程被重启,即提交失败,重新恢复后同上
解决方法
- 消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键、唯一键等天然的幂等功能
消费顺序
同一个分区内的数据都是有序的,这是由幂等性和请求缓存实现的
- 结合之前的分区器分区规则,我们可以指定key,来保证同一个key的数据都被放到同一分区
- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性
数据积压
根本原因:消费端出问题,消费速度过慢或者不消费
可能导致:Kafka默认数据过期时间为7天,过期后消息仍未消费会被删除,造成消息丢失
解决办法(提高消费者吞吐量):
- 增加Topic的分区数和消费者数
- 提高每批次拉取的消息条数(500 -> 1000)
参考资料
Q.E.D.