learn-tech/专栏/分布式中间件实践之路(完)/13深入解读基于Kafka和ZooKeeper的分布式消息队列原理.md
2024-10-16 06:37:41 +08:00

486 lines
40 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

因收到Google相关通知网站将会择期关闭。相关通知内容
13 深入解读基于 Kafka 和 ZooKeeper 的分布式消息队列原理
分布式消息队列是互联网领域广泛应用的中间件,在上一课中,我已经介绍了基于 Kafka、ZooKeeper 的分布式消息队列系统的搭建步骤,以及 Java 客户端的使用方法。
对于商业级消息中间件来说可靠性至关重要那么Kafka 是如何确保消息生产、传输、存储及消费过程中的可靠性的呢?本文将从 Kafka 的架构切入,解读 Kafka 基本原理,并对其存储机制、复制原理、同步原理、可靠性和持久性等作详细解读。
1. Kafka 总体架构
基于 Kafka、ZooKeeper 的分布式消息队列系统总体架构如下图所示:
典型的 Kafka 体系架构包括若干 Producer消息生产者若干 Broker作为 Kafka 节点的服务器),若干 Consumer Group以及一个 ZooKeeper 集群。
Kafka 通过 ZooKeeper 管理集群配置、选举 Leader并在 Consumer Group 发生变化时进行 Rebalance即消费者负载均衡在下一课介绍。Producer 使用 Push模式将消息发布到 BrokerConsumer 使用 Pull模式从 Broker 订阅并消费消息。
上图仅描摹了总体架构,并没有对作为 Kafka 节点的 Broker 进行深入刻画。事实上它的内部细节相当复杂如下图所示Kafka 节点涉及 Topic、Partition 两个重要概念。
在 Kafka 架构中,有几个术语需要了解下。
Producer 生产者即消息发送者Push 消息到 Kafka 集群的 Broker就是 Server
Broker Kafka 集群由多个 Kafka 实例Server组成每个实例构成一个 Broker其实就是服务器
Topic Producer 向 Kafka 集群 Push 的消息会被归于某一类别,即 Topic。本质上这只是一个逻辑概念面向的对象是 Producer 和 ConsumerProducer 只需关注将消息 Push 到哪一个 Topic 中,而 Consumer 只需关心自己订阅了哪个 Topic
Partition 每个 Topic 又被分为多个 Partition即物理分区。出于负载均衡的考虑同一个 Topic 的 Partition 分别存储于 Kafka 集群的多个 Broker 上。而为了提高可靠性,这些 Partition 可以由 Kafka 机制中的 Replicas 来设置备份的数量。如上面框架图所示,每个 Partition 都存在两个备份;
Consumer 消费者,从 Kafka 集群的 Broker 中 Pull 消息、消费消息;
Consumer Group High-Level Consumer API 中,每个 Consumer 都属于一个 Consumer Group每条消息只能被 Consumer Group 中的一个 Consumer 消费,但可以被多个 Consumer Group 消费;
Replicas Partition 的副本,保障 Partition 的高可用;
Leader Replicas 中的一个角色, Producer 和 Consumer 只与 Leader 交互;
Follower Replicas 中的一个角色,从 Leader 中复制数据,作为它的副本,同时一旦某 Leader 挂掉,便会从它的所有 Follower 中选举出一个新的 Leader 继续提供服务;
Controller Kafka 集群中的一个服务器,用来进行 Leader Election 以及各种 Fail Over
Zookeeper Kafka 通过 ZooKeeper 存储集群的 Meta 信息等,文中将详述。
1.1 Topic & Partition
为了便于区分消息Producer 向 Kafka 集群 Push 的消息会被归于某一类别,即 Topic。为了负载均衡、增强可扩展性Topic 又被分为多个 Partition。从存储层面来看每一个 Partition 都是一个有序的、不可变的记录序列通俗点就是一个追加日志Append Log文件。每个 Partition 中的记录都会被分配一个称为偏移量Offset的序列 ID 号,该序列 ID 号唯一地标识 Partition 内的每个记录。
Kafka 机制中Producer Push 的消息是追加Append到 Partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下图所示:
来源Kafka 官网)
1.2 Kafka 为什么要将 Topic 分区?
简而言之:负载均衡 + 水平扩展。
前已述及Topic 只是逻辑概念,面向的是 Producer 和 Consumer而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 Broker那么该 Topic 的所有读写请求都将由这个 Broker 处理,吞吐量很容易陷入瓶颈,这显然不适合高吞吐量应用场景。
有了 Partition 概念,假设一个 Topic 被分为 10 个 PartitionKafka 会根据一定的算法将 10 个 Partition 尽可能均匀地分布到不同的 Broker服务器上。当 Producer 发布消息时Producer 客户端可以采用 Random、Key-Hash 及轮询等算法选定目标 Partition。若不指定Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大地提高吞吐量,并且使系统具备良好的水平扩展能力。
那么,如何指定 Partition 的数量呢?通常有两种方式。
通过 Kafka 的配置文件指定
Kafka Server 的配置文件 $KAFKA_HOME/config/server.properties 中有一个参数 num.partitions该参数用来指定 Partition 的数量,如下所示:
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 这个参数默认是1在实际应用中可自行配置
num.partitions=1
创建 Topic 时动态指定
在上一课《搭建基于 Kafka 和 ZooKeeper 的分布式消息队列》中,我详细介绍过该方法,这里,我再举一个例子,命令形式如下(当然,通过客户端调用接口也是可以的):
# 创建Topic
./kafka-topics.sh --create --zookeeper 192.168.7.100:12181 --replication-factor 2 --partitions 3 --topic mytopic
# 解释
--replication-factor 2 #副本数为2
--partitions 3 #创建3个分区
--topic mytopic #主题名为mytopic
2. Kafka 高可靠性实现基础解读
谈及可靠性最常规、最有效的策略就是“副本Replication机制”Kafka 实现高可靠性同样采用了该策略。通过调节副本相关参数,可使 Kafka 在性能和可靠性之间取得平衡。本节先从 Kafka 文件存储机制入手,从最底层了解 Kafka 的存储细节,进而对消息的存储有个微观的认知。之后通过介绍 Kafka 的复制原理和同步方式来阐述宏观层面的概念。最后介绍 ISR、HW 和 Leader 选举。
2.1 Kafka 文件存储机制
上文已经介绍了 Topic 和 Partition。事实上Partition 并不是最终的存储粒度Partition 还可以进一步细分为 Segment。换言之一个 Partition 物理上由多个 Segment 组成Segment 才是实际的存储粒度。
为了直观地展现 Partition 的构成我结合一个实例加以说明。基于第11课《搭建基于 Kafka 和 ZooKeeper 的分布式消息队列》的内容,我们可以轻松地搭建一个“单机(只有一个 Broker” 消息队列。消息队列搭建好后,通过命令创建一个 Topic名为“mytopic_test”Partition 的数量配置为 4创建 Topic 的命令请见上一课)。之后,可以在 Kafka 日志目录(由 Kafka Server 配置文件 server.properties 中的参数 log.dirs 指定)中看到新生成的文件夹,如下所示:
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-0
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-1
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-2
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-3
通过以上命令,我们创建了一个 名为“mytopic_test”的 Topic同时为它指定了 4 个 Partition也就是上面的 4 个文件夹(由于 Partition 不是最终的存储粒度所以是文件夹而不是文件。Partition 的名称规则为Topic 名称+索引号,索引的范围为:[0num.partitions - 1]。
在初步了解了 Kafka 的文件存储机制后,不知读者是否思考过如下几个问题。
问题1为什么不能以 Partition 作为存储单位?
任何一个 Topic 中的 Partition 数量受限于 Kafka Broker 的数量,不可能太多。如果以 Partition 为存储粒度,随着消息源源不断写入,数量有限的 Partition 将急剧扩张,会对消息文件的维护以及对已消费消息的清理工作带来严重影响。
Partition 的数量为什么受限于 Kafka Broker 的数量?为了保证可靠性,每个 Partition 都应有若干个副本Replica其中一个 Replica 为 Leader其它都为 Follower。Leader 负责处理 Partition 的所有读写请求Follower 则负责被动地复制 Leader 上的数据。不难理解,对于任意一个 Topic有多少个 Partition 就有多少个 Leader并且这些 Leader 应尽量分散到不同的 Broker 上,否则,一旦某台 Broker如果它部署有大量 Leader故障下线势必引起连锁反应大量的 Partition 需要重选 Leader而这期间是不可用的。
鉴于上述原因,有必要对 Partition 进一步细分,细分的单位就是 Segment。很容易理解一个 Partition文件夹相当于一个巨大的文件通过细分被切分为多个大小相等的 Segment 文件,但由于消息大小不同,每个 Segment 中的消息数量通常是不同的。基于 Segment 机制,可以很方便地清理那些陈旧的消息(已被消费的消息),从而提高磁盘的利用率。关于 SegmentKafka Server 有如下配置参数:
log.segment.bytes=1073741824 #指定segment文件的大小因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件
log.retention.hours=168 #默认消息的最大持久化时间168小时7天
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间log.retention.hours=168 ),到目录查看是否有过期的消息,如果有则删除过期消息;
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能;
问题2Segment 的工作原理是什么?
上文已经介绍过Partition 的细分单位为 Segment接下来我们看下它的工作原理。打开一个 Partition 文件夹(目录),如下所示:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
并没有发现“Segment”文件事实上Segment 文件由两部分组成,即 .index 文件和 .log 文件,分别为 Segment 索引文件和数据文件。观察上面的文件名很容易理解它们的命规则对于索引文件Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的偏移量OffsetOffset 的数值由 20 位数字字符表示,没有数字的位置用 0 填充。对于数据文件,其命名与对应的索引文件保持一致即可。
为了便于读者理解,以上面的其中一“对” Segment 文件为例00000000000000170410.index 和 00000000000000170410.log绘制其关系图如下所示
.index 文件作为索引文件,存储的是元数据;.log 文件作为数据文件,存储的是消息。如何通过索引访问具体的消息呢?事实上,索引文件中的元数据指向的是对应数据文件中消息的物理偏移地址,有了消息的物理地址,自然也就可以访问对应的消息了。
其中以 .index 索引文件中的元数据 [2, 365] 为例,在 .log 数据文件表示第 2 个消息,即在全局 Partition 中表示 170410+2=170412 个消息,该消息的物理偏移地址为 365。
问题3如何从 Partition 中通过 Offset 查找 Message
基于问题 2 中的数据和图形,如何读取 offset=170425 的 Message消息关键在于通过 Offset 定位出消息的物理偏移地址。首先,列出各个 Segment 的索引文件及其偏移量范围,如下:
索引文件100000000000000000000.index偏移量范围[0170410]
索引文件200000000000000170410.index偏移量范围[170410+1239430]
索引文件300000000000000239430.index偏移量范围[239430+1239430+1+X]X大于1具体值与消息量和大小有关
根据索引文件的偏移量范围170410< offset=170425 <239430因此Offset=170425 对应的索引文件为 00000000000000170410.index索引文件是有序的”,通过二分查找又称折半查找便可快速定位具体文件位置此后根据 00000000000000170410.index 文件中的 [15,2369] 定位到数据文件 00000000000000170410.log 中的 2369 位置这便是目标消息的位置读取即可
问题4读取一条消息时如何确定何时读完本条消息呢
通过对问题 3 的解答相信读者已经理解了如何根据 Offset 定位一条消息的物理偏移地址但这个物理偏移地址实际上是一个起始地址如何确定本条消息的结尾终止地址
这个问题可通过消息在磁盘上的物理存储结构来解决其中包含偏移量消息体长度等可度量消息终止地址的数据消息的存储结构如下取自官网
baseOffset: int64 #偏移量
batchLength: int32 #消息体长度
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
2.2 复制原理和同步方式
副本机制初探
提高可用性冗余设计也称副本机制是常用的策略Kafka 也不例外 Kafka 副本是以 Partition 为单位换言之每个 Partition N 个副本Replica副本数量可在 Kafka 配置文件 server.properties 中设置
default.replication.factor=2 #复制因子默认值为2如果一个副本失效了另一个还可以继续提供服务从而保证可用性
基于多副本机制Kafka 可实现故障自动转移Fail Over)。副本根据角色的不同可分为以下三类
Leader 副本响应 Client 端读写请求的副本Producer Consumer 只跟 Leader 交互
Follower 副本被动地备份 Leader 副本中的数据不能响应 Client 端读写请求一旦 Leader 挂掉会从它的 Follower 中选举出一个新的 Leader 继续提供服务
ISR 副本包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本如何判定是否与 Leader 同步后面详述
为了便于读者更好地理解副本概念我们看下面这个例子
一个具有 4 Broker Kafka 集群TopicA 3 Partition每个 Partition 3 个副本Leader+Follower)。
如果 Leader 所在的 Broker 发生故障或宕机对应 Partition 将因无 Leader 而不能处理客户端请求这时副本的作用就体现出来了一个新 Leader 将从 Follower 中被选举出来并继续处理客户端的请求
如何确保新选举出的 Leader 是最优秀的
一个 Partition 有多个副本Replica为了提高可靠性这些副本分散在不同的 Broker 由于带宽读写性能网络延迟等因素同一时刻这些副本的状态通常是不一致的 Follower Leader 的状态不一致那么如何保证新 Leader 是优选出来的呢
Kafka 机制中Leader 将负责维护和跟踪一个 ISRIn-Sync Replicas列表即同步副本队列这个列表里面的副本与 Leader 保持同步状态一致如果新的 Leader ISR 列表里的副本中选出那么就可以保证新 Leader 是优秀的当然这不是唯一的策略下文将继续解读
2.3 同步副本 ISR
上节中讲到了同步副本队列 ISRIn-Sync Replicas Leader 副本 + 所有与 Leader 副本保持同步的 Follower 副本虽然副本可以提高可用性但副本数量对 Kafka 的吞吐率有一定影响默认情况下 Kafka Replica 数量为 2部分 Kafka 版本默认值为 1即每个 Partition 都有一个 Leader一个 Follower所有的副本Replica统称为 Assigned ReplicasAR)。显然ISR AR 中的一个子集 Leader 维护 ISR 列表Follower Leader 那里同步数据会有一些延迟由参数 replica.lag.time.max.ms 设置超时阈值超过阈值的 Follower 将被剔除出 ISR 存入 OSROutof-Sync Replicas列表新加入的 Follower 也会先存放在 OSR 即有关系式 AR=ISR+OSR。
LEO & HW
前面提到 Kafka Topic 的每个 Partition 可能有多个副本Replica用于实现冗余从而实现高可用每个副本又有两个重要的属性 LEO HW
通过前面内容的学习我们知道在 Kafka 的存储机制中Partition 可以细分为 Segment Segment 是最终的存储粒度不过对于上层应用来说仍然可以将 Partition 看作最小的存储单元 Partition 可以看作是由一系列的 Segment 组成的粒度更粗的存储单元它由一系列有序的消息组成这些消息被连续的追加到 Partition
LEOHW 以及 Offset 的关系图如下
LEO即日志末端位移Log End Offset表示每个副本的 Log 最后一条 Message 的位置比如 LEO=10、HW=7则表示该副本保存了 10 条消息而后面 3 条处于 Uncommitted 状态
HW即水位值High Watermark)。对于同一个副本而言 HW 值不大于 LEO 小于等于 HW 值的所有消息都被认为是已备份Replicated对于任何一个 Partition取其对应的 ISR 中最小的 LEO 作为 HWConsumer 最多只能消费到 HW 所在的位置
此外需要说明的是每个 Replica 包含 Leader Follower都有 HW它们各自负责更新自己的 HW 的状态
HW 并不是 Kafka 特有的概念HW 通常被用在流式处理领域比如 Apache FlinkApache Spark 以表征元素或事件在时间层面上的进度 Kafka 对于 Leader 新写入的消息Leader 会等待该消息被 ISR 中所有的 Replica 同步后再更新 HW之后该消息才能被 Consumer 消费这种机制有一个好处确保 HW 及其之前的消息Committed 状态都是已备份的即便 Leader 所在的 Broker 因故障下线那么 Committed 状态的消息仍然可以从新选举出的 Leader 中获取
下面我们举例说明
某个 Partition ISR 列表包括 3 个副本1 Leader+2 Follower Producer 向其 Leader 写入一条消息后HW LEO 有如下变化过程
由上图可以看出Kafka 的复制机制既不是完全的同步复制也不是单纯的异步复制同步复制要求所有能工作的 Follower 都复制完这条消息才会被置为 Committed 状态该复制方式受限于复制最慢的 Follower会极大地影响吞吐率因而极少应用于生产环境而异步复制方式下Follower 异步地从 Leader 复制数据数据只要被 Leader 写入 Log 就被认为已经 Committed类似 Redis主从异步复制)。如果在 Follower 尚未复制完成的情况下Leader 宕机则必然导致数据丢失很多时候这是不可接受的
相较于完全同步复制和异步复制Kafka 使用 ISR 的策略则是一种较中庸的策略在可靠性和吞吐率方面取得了较好的平衡某种意义上ISR 策略与第 8 课中介绍的 Raft 算法所采用的多数派原则类似不过 ISR 更为灵活
2.4 Kafka 消息生产的可靠性
Kafka 而言可靠性贯穿消息的生产发送存储及消费等过程本节将介绍消息生产过程中的可靠性即当 Producer Leader 发送消息时的可靠性在客户端可以通过 request.required.acks 参数来设置数据可靠性的级别
1request.required.acks = 1
这是默认情况即消息的强制备份数量为 1Producer 发送数据到 Leader只要 Leader 成功写入本地日志即成功返回客户端不要求 ISR 中的其它副本与 Leader 保持同步对于 Producer 发来的消息如果 Leader 刚写入并成功返回后便宕机此次发送的消息就会丢失
2request.required.acks = 0
即消息的强制备份数量为 0Producer 不停向 Leader 发送数据而不需要 Leader 反馈成功消息这种情况下数据传输效率最高与此同时可靠性也是最低的可能在发送过程中丢失数据可能在 Leader 宕机时丢失数据
3request.required.acks = -1all
即消息的强制备份数量为 ISR 列表中副本的数量Producer 发送数据给 LeaderLeader 收到数据后要等到 ISR 列表中的所有副本都完成数据同步后强一致性才向生产者返回成功消息如果一直收不到成功消息则认为发送数据失败会自动重发数据这是可靠性最高的方案当然性能也会受到一定影响
单纯设置 request.required.acks 保障不了消息生产的可靠性
回顾之前讲得由于 Follower Leader 同步数据有一些延迟由参数 replica.lag.time.max.ms 设置超时阈值超过阈值的 Follower 将被剔除出 ISR 存入 OSROutof-Sync Replicas列表因此ISR 列表实际上是动态变化的
我们思考一个问题由于 ISR 列表是动态变化的如果 ISR 中的副本因网络延迟等原因被踢出只剩下 Leader即便设置参数 request.required.acks=-1 也无法保证可靠性鉴于此需要对 ISR 列表的最小副本数加以约束 ISR 列表中的副本数不得小于一个阈值Kafka 提供了这样一个参数min.insync.replicas该参数用于设定 ISR 中的最小副本数默认值为 1当且仅当 request.required.acks 参数设置为 -1 此参数才生效 ISR 中的副本数少于 min.insync.replicas 配置的数量时客户端会返回如下异常
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
不难理解如果 min.insync.replicas 设置为 2 ISR 中实际副本数为 1 只有 Leader将无法保证可靠性此时拒绝客户端的写请求以防止消息丢失
2.5 Leader 选举机制解读
关于 Kafka 的可靠性前面介绍了消息生产过程中的可靠性保障机制设置参数 request.required.acks=-1 对于任意一条消息只有在它被对应 Partition ISR 中的所有副本都复制完毕后才会被认为已提交并返回信息给 Producer进一步通过参数 min.insync.replicas 协同可以提高消息生产过程中的可靠性
问题1如何在保证可靠性的前提下避免吞吐量下降
根据官网的解释如果 Leader 永远不会宕机我们根本不需要 Follower当然这是不可能的考虑这样一个问题 Leader 宕机了怎样从 Follower 中选举出新的 Leader由于各种原因Follower 可能滞后很多或者直接崩溃掉因此我们必须确保选择 最新Up-to-Date)” Follower 作为新的 Leader
对于日志复制算法而言必须提供这样一个基本的保证如果 Leader 挂掉新晋 Leader 必须拥有原来的 Leader 已经 Commit即响应客户端 Push 消息成功的所有消息显然ISR 中的副本都具有该特征
但存在一个问题ISR 列表维持多大的规模合适呢换言之Leader 在一条消息被 Commit 前需要等待多少个 Follower 确认呢等待 Follower 的数量越多 Leader 保持同步的 Follower 就越多可靠性就越高但这也会造成吞吐率的下降
少数服从多数的选举原则
少数服从多数是最常用的选举原则它也称为多数派原则”, 8 课中介绍的 Raft 算法就采用了这种选举原则不过Kafka 并不是采用这种方式
基于该原则如果有 2f+1 个副本那么在一条消息被 Commit 前必须保证有 f+1 Replica 复制完消息同时失败的副本数不能超过 f 否则将丧失选举基础这种方式有个很出色的特点延迟只取决于最快的服务器也就是说如果复制因子是 3则延迟是由更快的 Follower 而不是较慢的 Follower 决定的
当然,“少数服从多数的选举策略也有一些不足为了保证 Leader 选举正常进行它所能容忍的失败 Follower 数比较少如果要容忍 m Follower 故障至少要 2*m+1 个副本换言之在生产环境中为了保证较高的容错率必须维持大量的副本而过多的副本又会反过来制约性能鉴于此这种算法很少在需要保证高吞吐量的系统中使用
问题2Kafka 选举 Leader 的策略是怎样的
关于这一点官网有如下叙述
Leader 选举有很多实用的算法比如 ZooKeeper ZabRaft 以及 Viewstamped Replication就目前而言 Kafka 所使用的 Leader 选举算法相似度最高的是微软的 PacificA 算法
Kafka 通过 ZooKeeper 为每一个 Partition 动态维护了一个 ISR 列表通过前面的学习我们知道 ISR 里的所有 Replica 都与 Leader 保持同步严格得讲为了保证可靠性只有 ISR 里的成员才能有被选为 Leader 的可能通过参数配置unclean.leader.election.enable=false。基于该策略如果有 f+1 个副本一个 Kafka Topic 能在保证不丢失已经 Commit 消息的前提下容忍 f 个副本的失败在大多数使用场景下这种折衷策略是合理的
实际上对于任意一条消息只有它被 ISR 中的所有 Follower 都从 Leader 复制过去才会被认为 Committed并返回信息给 Producer从而保证可靠性但与少数服从多数策略不同的是Kafka ISR 列表中副本的数量不需要超过副本总数的一半即不需要满足多数派原则通常ISR 列表副本数大于等于 2 即可如此便在可靠性和吞吐量方面取得平衡
极端情况下的 Leader 选举策略
前已述及 ISR 中至少有一个 Follower ISR 包括 LeaderKafka 可以确保已经 Commit 的消息不丢失但如果某一个 Partition 的所有 Replica 都挂了所谓的保证自然也就不存在了这种情况下如何进行 Leader 选举呢通常有两种方案
等待 ISR 中任意一个 Replica 恢复过来并且选它作为 Leader希望它仍然有它所有的数据;
选择第一个恢复过来的 Replica不一定非在 ISR 作为 Leader
如何选择呢这就需要在可用性和一致性当中作出抉择如果坚持等待 ISR 中的 Replica 恢复过来不可用的时间可能相对较长如果 ISR 中所有的 Replica 都无法恢复或者数据丢失了这个 Partition 将永远不可用
如果选择方案 2有可能存在这样的情况首先恢复过来的 Replica 不是 ISR 中的 Replica那么它可能并不具备所有已经 Commit 的消息从而造成消息丢失默认情况下 0.11.0.0 版本起Kafka 采用第一种策略 unclean.leader.election.enable=false以保证一致性。unclean.leader.election.enable 这个参数对于 Leader 选举系统可用性一致性以及数据可靠性都有重要影响在生产环境中应慎重权衡
3. Kafka 架构中 ZooKeeper 以怎样的形式存在
ZooKeeper 是一个分布式开放源码的分布式应用程序协调服务 Google Chubby 开源实现分布式应用程序可以基于它实现统一命名服务状态同步服务集群管理分布式应用配置项的管理等工作
在基于 Kafka 的分布式消息队列中ZooKeeper 的作用有 Broker 注册Topic 注册Producer Consumer 负载均衡维护 Partition Consumer 的关系记录消息消费的进度以及 Consumer 注册等
3.1 Broker ZooKeeper 中的注册
为了便于大家理解我首先解释下注册一词ZooKeeper 是一个共享配置中心我们可以将一些信息存放入其中比如 Broker 信息本质上就是存放一个文件目录这个配置中心是共享的分布式系统的各个节点都可以从配置中心访问到相关信息同时ZooKeeper 还具有 Watch 机制参考第 8 课中介绍的 Raft 算法一旦注册信息发生变化比如某个注册的 Broker 下线ZooKeeper 就会删除与之相关的注册信息其它节点可以通过 Watch 机制监听到这一变化进而做出响应
其实ZooKeeper 与第 8 课介绍的 Etcd 有很多共性如果读者已经阅读过 Etcd 相关内容将很容易理解 ZooKeeper 相关内容
言归正传Broker 注册也就是 Kafka 节点注册本质上就是在 ZooKeeper 中创建一个专属的目录又称为节点其路径为 / brokers
读者应该记得上一课在讲述 Kafka 配置时特意强调了节点的唯一标识broker.id它有什么用呢 Broker 启动的同时需要到配置中心ZooKeeper注册 broker.id 作为唯一标识根据它便可在 ZooKeeper 中创建专属节点目录其路径为 /brokers/ids/{broker.id}。
在专属节点创建好后Kafka 会将该 Broker 相关的信息存入其中包括 broker.name 端口号
需要特别说明的是Broker ZooKeeper 中注册的节点是临时节点”,一旦 Broker 故障下线ZooKeeper 就会将该节点删除同时可以基于 Watch 机制监听到这一删除事件进而做出响应如负载均衡)。
3.2 Topic ZooKeeper 中的注册
Kafka 所有 Topic Broker 的对应关系都由 ZooKeeper 来维护 ZooKeeper 通过建立专属的节点来存储这些信息其路径为 /brokers/topics/{topic_name}。
前面说过为了保障数据的可靠性每个 Topic Partition 实际上是存在备份的并且备份的数量由 Kafka 机制中的 Replicas 来控制那么问题来了如下图所示假设某个 TopicA 被分为 2 Partition并且存在两个备份由于这 2 Partition1-2被分布在不同的 Broker 同一个 Partiton 与其备份不能也不应该存储于同一个 Broker Partition1 为例假设它被存储于 Broker2其对应的备份分别存储于 Broker1 Broker4有了备份可靠性得到保障但数据一致性却是个问题
为了保障数据的一致性ZooKeeper 机制得以引入基于 ZooKeeperKafka 为每一个 Partition 找一个节点作为 Leader其余备份作为 Follower接续上图的例子 TopicA Partition1 而言如果位于 Broker2Kafka 节点上的 Partition1 Leader那么位于 Broker1 Broker4 上面的 Partition1 就充当 Follower则有下图
基于上图的架构 Producer Push 的消息写入 Partition分区作为 Leader BrokerKafka 节点会将消息写入自己的分区同时还会将此消息复制到各个 Follower实现同步如果某个 Follower 挂掉Leader 会再找一个替代并同步消息如果 Leader 挂了Follower 们会选举出一个新的 Leader 替代继续业务这些都是由 ZooKeeper 完成的
3.3 Consumer ZooKeeper 中的注册
Consumer Group 注册
BrokerTopic 注册类似Consumer Group 注册本质上也是在 ZooKeeper 中创建专属的节点以记录相关信息其路径为 /consumers/{group_id}。
这里补充一点 ZooKeeper /consumers/{group_id} 虽然被称为节点但本质上是一个目录既然是目录在记录信息时就可以根据信息的不同进一步创建子目录子节点分别记录不同类别的信息对于 Consumer Group 而言有三类信息需要记录因此/consumers/{group_id} 下还有三个子目录如下所示
idsConsumer Group 中有多个 Consumerids 用于记录这些 Consumer
owners记录该 Consumer Group 可消费的 Topic 信息
offsets记录 owners 中每个 Topic 的所有 Partition Offset
Consumer 注册
原理同 Consumer Group 注册不过需要注意的是其节点路径比较特殊需在路径 / consumers/{group_id}/ids 下创建专属子节点它是临时的比如 Consumer 的临时节点路径为 / consumers/{group_id}/ids/my_consumer_for_test-1223234-fdfv1233df23
负载均衡
通过前面的学习我们知道对于一条消息订阅了它的 Consumer Group 中只能有一个 Consumer 消费它那么就存在一个问题一个 Consumer Group 中有多个 Consumer如何让它们尽可能均匀地消费订阅的消息呢也就是负载均衡这里不讨论实现细节但要实现负载均衡实时获取 Consumer 的数量显然是必要的通过 Watch 机制监听 / consumers/{group_id}/ids 下子节点的事件便可实现
3.4 Producers 负载均衡
前面已经介绍过为了负载均衡和避免连锁反应Kafka 同一个 Topic Partition 会尽量分散到不同的 Broker Producers 则根据指定的 Topic 将消息 Push 到相应的 Partition那么如何将消息均衡地 Push 到各个 Partition 这便是 Producers 负载均衡的问题
Producers 启动后同样也要进行注册依然是创建一个专属的临时节点为了负载均衡Producers 会通过 Watcher 机制监听 Brokers 注册节点的变化一旦 Brokers 发生变化如增加减少Producers 可以收到通知并更新自己记录的 Broker 列表 此外基于 ZooKeeper 提供的 Watcher 机制还可以监听其它在 ZooKeeper 上注册的节点 TopicConsumer
Producer Kafka 集群 Push 消息的时候必须指定 Topic不过Partition 却是非必要的事实上目前高级的客户端已经不提供指定 Partition 的接口虽然不提供但并不代表无须指定 Partition只是隐藏了细节通常有两种方式用于指定 Partition
低级接口
在指定 Topic 的同时需指定 Partition 编号012……N消息数据将被 Push 到指定的 Partition 从负载均衡的角度看这并不是一种友好的方式
高级接口
不支持指定 Partition隐藏相关细节内部则采用轮询对传入 Key 进行 Hash 等策略将消息数据均衡地发送到各个 Partition此外有一些 Kafka 客户端还支持自定义负载均衡策略
3.5 Consumer 负载均衡
基于 Producer 的负载均衡策略对于任意一个 Topic其各个 Partition 中消息量相对均衡进一步对于 Topic 的任意一条消息订阅了它的任何一个 Consumer Group 中都只能有一个 Consumer 消费它在此约束下如何实现 Consumer 均衡地消费消息呢
一种最朴实的想法是对于订阅的 Topic既然 Partition 中的消息是均衡的那么可以为 Consumer Group 中的各个 Consumer 分别指定不同的 Partition只要保证该过程相对公平即可不过需要注意的是Consume Group Consumer 的数量是动态变化的Topic Partition 数量也不是固定值如何均匀分配呢
借助 ZooKeeper 实现负载均衡
Consumer 消费消息时高级别 API 只需指定 Topic 即可隐藏了负载均衡策略而低级别的 API 通常需要同时指定 Topic Partition需要自行实现负载均衡策略高级别 API 的负载均衡策略需借助 ZooKeeper 实现具体原理如下
前已述及Consumer GroupConsumerBroker 都会在 ZooKeeper 中注册节点因此基于 ZooKeeper 提供的 WatcherConsumer 可以监听同一 Group Consumers 的变化以及 Broker 列表的变化进一步根据 Consumer 列表 Partition 排序后轮流进行分配由于这是一个动态过程相应的负载均衡被称为 Rebalance其描述如下
对任意一个 Topic 中所有的 Partirtion 进行排序用数组 PT 记录
某一 Consumer Group 订阅了上述 Topic对它的所有 Consumer 排序用数组 CG 记录 i Consumer 记为 CG[i]
比例系数为 F=size(PT)/size(CG),向上取整;
解除 CG[i] 对原来分配的 Partition 的消费权i 0 开始最大值为 size(CG)-1;
将第 i*F (i+1)*F-1 Partition 分配给 CG[i]。 
3.6 记录消费进度 Offset
3.3 节中曾提及 Offset它是 /consumers/[group_id] 下的一个子节点Kafka Consumer 采用 Pull 模式消费相应 Partition 中的消息是一种异步消费模式为了避免因 Consumer 故障重启Rebalance 等原因造成重复消费遗漏消费消息需要记录 Consumer Partition 中消息的消费进度即偏移量 OffsetOffset ZooKeeper 有一个专属的节点目录用于记录 Offset其路径样式如下
#节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
需要说明的是 Kafka 的最新版本 Kafka 2.0 Offset 信息不再记录于 ZooKeeper而是保存于 Kafka Topic 路径如下
__consumer_offsets(/brokers/topics/__consumer_offsets)
3.7 记录 Partition Consumer 的关系
Consumer Group ZooKeeper 上的注册节点为 /consumers/[group_id] Consumer Group 中的 Consumer ZooKeeper 上的注册节点为 /consumers/[group_id] 下的子节点 owners它们共享一个 Group ID为了 Consumer 负载均衡同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个 ConsumerPartition Consumer 的对应关系也需要在 ZooKeeper 中记录路径为
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
补充这个路径也是一个临时节点进行 Rebalance 时会被删除而后依据新的对应关系重建此外[broker_id-partition_id] 是一个消息分区的标识其内容就是该消息分区消费者的 Consumer ID通常采用 hostname:UUID 形式表示
4. 全程解析Producer-kafka-Consumer
4.1 Producer 发布消息
Producer 采用 Push 模式将消息发布到 Kafka Broker根据负载均衡算法如轮询Hash 这些消息将均衡写入到相应 Topic 对应的各个 Partition 在存储层面采用顺序写磁盘 Append模式写入详细流程如下
Producer Push 消息基于负载均衡算法获得目标 Partition Producer 先从 ZooKeeper /brokers/.../state 节点找到该 Partition Leader
Producer 将消息发送给该 Leader
Leader 将消息写入本地 Log
所有 Follower 主动从 Leader Pull 消息写入本地 Log 后向 Leader 发送 ACK
Leader 收到所有 ISR 中所有 Replica ACK 更新 HWHigh Watermark最后 Commit Offset并向 Producer 发送 ACK
Producer 接到 ACK确认发送成功
4.2 Broker 存储消息
Topic 是逻辑概念 Topic 对应的 Partition 则是物理概念每个 Partition 在存储层面都对应一个文件夹目录)。由于 Partition 并不是最终的存储粒度该文件夹下还有多个 Segment消息索引和数据文件它们是真正的存储文件)。
4.3 Consumer 消费消息
前面介绍过目前采用的高级 APIConsumer 在消费消息时只需指定 Topic 即可API 内部实现负载均衡并将 Offset 记录到 ZooKeeper
值得一提的是Consumer 采用 Pull 模式从 Broker 中读取数据这是一种异步消费模式 Producer 采用的 Push 模式全然不同Push 模式追求速度越快越好当然它取决于 Broker 的性能 Pull 模式则是追求自适应能力Consumer 根据自己的消费能力消费
参考文献
Kafka 官方文档