Kafka消息队列

Kafka基础架构

Kafka是一个分布式的基于发布/订阅模式的消息队列

总体上分为 生产者Producer、服务端(集群)Broker、消费者Consumer
其中 Broker 包含三个重要概念:主题Topic、分区Partition、副本Replica

基本流程为:

  • 生产者向一个主题发送消息,消费者订阅一个主题后能接收到该主题上的消息
  • 一个主题的消息可以分布在多个Broker上,称为分区
  • 为了保证高可用,分区的数据引入副本的概念,多个副本会尽量分布在不同的Broker上,并从中选出一个Leader与生产者、消费者通信,具体见后文

基本架构图
image-20220406141233367

生产者基础架构

  • 生产者调用send(topic,value)发送数据,经过拦截器和序列化器处理数据
  • 到达分区器之后,根据分区规则确定分区,发送到该分区对应的缓存队列中(没有则创建)
  • 当缓存队列里的数据大于等于发送批次大小(16K)或到达间隔时间时,唤醒Sender线程
  • Sender线程从缓冲队列读取数据,封装成InflightRequest 请求,通过Selector ,按照Broker节点id为单位批量发送给服务端
  • Broker接收到后应答ACK,若成功则在BufferPool清理对应数据的内存,失败则重试

image-20220406141411612

分区规则:

  • 如果指定了分区,则发送到该分区
  • 自定义分区,可以自定义分区器,实现Partitioner接口,再配置上Class即可
  • 如果没指定分区和自定义分区器,但指定了key,则发送到 key.hashcode() % partitions.size()分区
  • 如果都没指定,则按照粘性分区策略,即随机一个分区,之后的数据发送到该分区,直到发满一个批次(16KB)或已完成,再随机一个不同的分区

Broker基础架构

image-20220406145231119

消费者基础架构

初始化过程

  • 定位Coordinator:Consumer会向最近的Broker询问自己对应的 coordinator所在的Broker
  • 消费者组每个consumer向coordinator发送join group请求,coordinator从中选出一个consumer作为leader,并把要消费的topic信息发送给该leader
  • 该leader负责制定消费方案,并发送给coordinator,由它发送给组内的所有消费者执行

image-20220406151741721

消费方案(消费分区分配策略)

举例:七个分区,消费组有3个

  • Range(默认):均匀按序分配,前面的消费者多消费,如: (0,1,2) (3,4) (5,6)
  • Round Robin:均匀轮询分配,如 (0,3,6) (1,4) (2,5)
  • Sticky Strategy:粘性分配,随机均匀分配

消费过程

image-20220406151146598

ZooKeeper在Kafka中的作用

  • 参考自:博弈史密斯JavaGuide
  • Broker 注册 :在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
  • Topic 注册 : 在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0/brokers/topics/my-topic/Partitions/1
  • 负载均衡 :上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

image-20220406165508707

Kafka文件管理

文件存储

server.properties文件中配置log.dirs=D:\Kafka\kafka_2.13-3.1.0\data,这里会存储消息数据

Kafka中的消息以磁盘文件的方式存储,topic作为逻辑分类,partition作为物理文件格式,文件夹格式为topic-partition,如下👇
image-20220406170446957

每个文件夹都代表一个partition的log数据,为了防止数据太大不好定位和操作,对其进行分片和索引,分为一个个的segment,每个segment最大1G,里面由.log.index.timeindex三种文件,分别用于记录数据,稀疏索引、和时间戳索引

image-20220406170842990

文件清除策略

默认情况下,与.timeindex中最后一条记录的时间超过七天,则会触发文件清除策略

  • 直接删除(默认)
  • 压缩:对于相同key不同value,只保留最新版本

高效读写

  • Kafka本身分布式集群,采用分区技术,并行度高

  • 读取时采用稀疏索引,可以快速定位要消费的数据

  • 顺序读写磁盘

  • 页缓存和零拷贝技术
    具体参考:小林coding

    image-20220406171506522

高可用

  • 每个主题(topic)都设置一定数量的分区(partitions),然后分布在多个节点(brokers)上,组成分布式的消息队列

  • 每个分区都会有若干个副本(replica),分为leader副本和follower副本,它们通常不在同一个服务器节点上,保证了一台服务器宕机时,还能从没宕机的服务器上找到数据

  • 生产者和消费者读写数据都在leader副本上进行,follower主动从leader副本同步数据,不开放follower是为了均衡数据一致性和系统复杂度

  • 如何选取Leader?

    image-20220406113351818

  • Follower故障了怎么办?

    • 临时踢出ISR
    • 恢复后读取本地磁盘上次的HW,将高于HW的数据删除,重新同步
    • 等到LEO赶上该Partition的HW之后再加入ISR
  • Leader故障了怎么办?
    image-20220406115802046

数据可靠

数据可靠,也就是服务器接收到数据至少一次

  • acks级别为-1:所有ISR中的副本都落盘后才返回应答,即使leader应答后挂了,副本也有正确数据
  • 分区副本数和ISR最小副本数大于等于2:防止leader挂了,数据丢失

所以生产者经过如上配置后一定不会丢数据,消费者端丢失的情况,在处理完消息后手动提交offset可以解决,但需要处理重复消费问题。

重复消费

重复消费的出现分两种情况

重复落盘导致

由之前的生产者模型可以知道,一个Inflight Request失败时会进行重试,如果这个请求的消息已经持久化了,只是发送ACK时网络故障,那么导致的重试就会引起重复落盘,多出了一条重复数据!

解决方法

  • 实现精准一次:幂等性 + 至少一次
  • 幂等性用于保证不重复,其依靠的是消息由< pid, partition, seqNum >三元标识符唯一标识,但只限于单分区单会话不重复,重启后新开的会话还是有可能重复
  • 于是引入Kafka事务机制,由生产者指定全局唯一的事务id来获取事务协调器所在的Broker,并由它分配pid,保证了多个会话间的pid一致,本质还是幂等性
  • image-20220406155120674

offset提交导致

  • 如果是自动提交offset:频率为5s一次,假设提交offset后2s consumer挂掉了,那这2s消费了的offset没有提交,重启后会再消费一次
  • 如果是手动提交:提交offset时消费者进程被重启,即提交失败,重新恢复后同上

解决方法

  • 消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键、唯一键等天然的幂等功能

消费顺序

同一个分区内的数据都是有序的,这是由幂等性和请求缓存实现的
image-20220406162835384

  • 结合之前的分区器分区规则,我们可以指定key,来保证同一个key的数据都被放到同一分区
  • 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性

数据积压

根本原因:消费端出问题,消费速度过慢或者不消费

可能导致:Kafka默认数据过期时间为7天,过期后消息仍未消费会被删除,造成消息丢失

解决办法(提高消费者吞吐量):

  • 增加Topic的分区数和消费者数
  • 提高每批次拉取的消息条数(500 -> 1000)

参考资料

Q.E.D.


记录 • 分享 • 日常