575 lines
33 KiB
Markdown
575 lines
33 KiB
Markdown
|
||
|
||
因收到Google相关通知,网站将会择期关闭。相关通知内容
|
||
|
||
|
||
18 PartitionStateMachine:分区状态转换如何实现?
|
||
你好,我是胡夕。今天我们进入到分区状态机(PartitionStateMachine)源码的学习。
|
||
|
||
PartitionStateMachine负责管理Kafka分区状态的转换,和ReplicaStateMachine是一脉相承的。从代码结构、实现功能和设计原理来看,二者都极为相似。上节课我们已经学过了ReplicaStateMachine,相信你在学习这节课的PartitionStateMachine时,会轻松很多。
|
||
|
||
在面试的时候,很多面试官都非常喜欢问Leader选举的策略。学完了今天的课程之后,你不但能够说出4种Leader选举的场景,还能总结出它们的共性。对于面试来说,绝对是个加分项!
|
||
|
||
话不多说,我们就正式开始吧。
|
||
|
||
PartitionStateMachine简介
|
||
|
||
PartitionStateMachine.scala文件位于controller包下,代码结构不复杂,可以看下这张思维导图:
|
||
|
||
|
||
|
||
代码总共有5大部分。
|
||
|
||
|
||
PartitionStateMachine:分区状态机抽象类。它定义了诸如startup、shutdown这样的公共方法,同时也给出了处理分区状态转换入口方法handleStateChanges的签名。
|
||
ZkPartitionStateMachine:PartitionStateMachine唯一的继承子类。它实现了分区状态机的主体逻辑功能。和ZkReplicaStateMachine类似,ZkPartitionStateMachine重写了父类的handleStateChanges方法,并配以私有的doHandleStateChanges方法,共同实现分区状态转换的操作。
|
||
PartitionState接口及其实现对象:定义4类分区状态,分别是NewPartition、OnlinePartition、OfflinePartition和NonExistentPartition。除此之外,还定义了它们之间的流转关系。
|
||
PartitionLeaderElectionStrategy接口及其实现对象:定义4类分区Leader选举策略。你可以认为它们是发生Leader选举的4种场景。
|
||
PartitionLeaderElectionAlgorithms:分区Leader选举的算法实现。既然定义了4类选举策略,就一定有相应的实现代码,PartitionLeaderElectionAlgorithms就提供了这4类选举策略的实现代码。
|
||
|
||
|
||
类定义与字段
|
||
|
||
PartitionStateMachine和ReplicaStateMachine非常类似,我们先看下面这两段代码:
|
||
|
||
// PartitionStateMachine抽象类定义
|
||
abstract class PartitionStateMachine(
|
||
controllerContext: ControllerContext) extends Logging {
|
||
......
|
||
}
|
||
// ZkPartitionStateMachine继承子类定义
|
||
class ZkPartitionStateMachine(config: KafkaConfig,
|
||
stateChangeLogger: StateChangeLogger,
|
||
controllerContext: ControllerContext,
|
||
zkClient: KafkaZkClient,
|
||
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends PartitionStateMachine(controllerContext) {
|
||
// Controller所在Broker的Id
|
||
private val controllerId = config.brokerId
|
||
......
|
||
}
|
||
|
||
|
||
从代码中,可以发现,它们的类定义一模一样,尤其是ZkPartitionStateMachine和ZKReplicaStateMachine,它们接收的字段列表都是相同的。此刻,你应该可以体会到它们要做的处理逻辑,其实也是差不多的。
|
||
|
||
同理,ZkPartitionStateMachine实例的创建和启动时机也跟ZkReplicaStateMachine的完全相同,即:每个Broker进程启动时,会在创建KafkaController对象的过程中,生成ZkPartitionStateMachine实例,而只有Controller组件所在的Broker,才会启动分区状态机。
|
||
|
||
下面这段代码展示了ZkPartitionStateMachine实例创建和启动的位置:
|
||
|
||
class KafkaController(......) {
|
||
......
|
||
// 在KafkaController对象创建过程中,生成ZkPartitionStateMachine实例
|
||
val partitionStateMachine: PartitionStateMachine =
|
||
new ZkPartitionStateMachine(config, stateChangeLogger,
|
||
controllerContext, zkClient,
|
||
new ControllerBrokerRequestBatch(config,
|
||
controllerChannelManager, eventManager, controllerContext,
|
||
stateChangeLogger))
|
||
......
|
||
private def onControllerFailover(): Unit = {
|
||
......
|
||
replicaStateMachine.startup() // 启动副本状态机
|
||
partitionStateMachine.startup() // 启动分区状态机
|
||
......
|
||
}
|
||
}
|
||
|
||
|
||
有句话我要再强调一遍:每个Broker启动时,都会创建对应的分区状态机和副本状态机实例,但只有Controller所在的Broker才会启动它们。如果Controller变更到其他Broker,老Controller所在的Broker要调用这些状态机的shutdown方法关闭它们,新Controller所在的Broker调用状态机的startup方法启动它们。
|
||
|
||
分区状态
|
||
|
||
既然ZkPartitionStateMachine是管理分区状态转换的,那么,我们至少要知道分区都有哪些状态,以及Kafka规定的转换规则是什么。这就是PartitionState接口及其实现对象做的事情。和ReplicaState类似,PartitionState定义了分区的状态空间以及流转规则。
|
||
|
||
我以OnlinePartition状态为例,说明下代码是如何实现流转的:
|
||
|
||
sealed trait PartitionState {
|
||
def state: Byte // 状态序号,无实际用途
|
||
def validPreviousStates: Set[PartitionState] // 合法前置状态集合
|
||
}
|
||
|
||
case object OnlinePartition extends PartitionState {
|
||
val state: Byte = 1
|
||
val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
|
||
}
|
||
|
||
|
||
如代码所示,每个PartitionState都定义了名为validPreviousStates的集合,也就是每个状态对应的合法前置状态集。
|
||
|
||
对于OnlinePartition而言,它的合法前置状态集包括NewPartition、OnlinePartition和OfflinePartition。在Kafka中,从合法状态集以外的状态向目标状态进行转换,将被视为非法操作。
|
||
|
||
目前,Kafka为分区定义了4类状态。
|
||
|
||
|
||
NewPartition:分区被创建后被设置成这个状态,表明它是一个全新的分区对象。处于这个状态的分区,被Kafka认为是“未初始化”,因此,不能选举Leader。
|
||
OnlinePartition:分区正式提供服务时所处的状态。
|
||
OfflinePartition:分区下线后所处的状态。
|
||
NonExistentPartition:分区被删除,并且从分区状态机移除后所处的状态。
|
||
|
||
|
||
下图展示了完整的分区状态转换规则:
|
||
|
||
|
||
|
||
图中的双向箭头连接的两个状态可以彼此进行转换,如OnlinePartition和OfflinePartition。Kafka允许一个分区从OnlinePartition切换到OfflinePartition,反之亦然。
|
||
|
||
另外,OnlinePartition和OfflinePartition都有一根箭头指向自己,这表明OnlinePartition切换到OnlinePartition的操作是允许的。当分区Leader选举发生的时候,就可能出现这种情况。接下来,我们就聊聊分区Leader选举那些事儿。
|
||
|
||
分区Leader选举的场景及方法
|
||
|
||
刚刚我们说了两个状态机的相同点,接下来,我们要学习的分区Leader选举,可以说是PartitionStateMachine特有的功能了。
|
||
|
||
每个分区都必须选举出Leader才能正常提供服务,因此,对于分区而言,Leader副本是非常重要的角色。既然这样,我们就必须要了解Leader选举什么流程,以及在代码中是如何实现的。我们重点学习下选举策略以及具体的实现方法代码。
|
||
|
||
PartitionLeaderElectionStrategy
|
||
|
||
先明确下分区Leader选举的含义,其实很简单,就是为Kafka主题的某个分区推选Leader副本。
|
||
|
||
那么,Kafka定义了哪几种推选策略,或者说,在什么情况下需要执行Leader选举呢?
|
||
|
||
这就是PartitionLeaderElectionStrategy接口要做的事情,请看下面的代码:
|
||
|
||
// 分区Leader选举策略接口
|
||
sealed trait PartitionLeaderElectionStrategy
|
||
// 离线分区Leader选举策略
|
||
final case class OfflinePartitionLeaderElectionStrategy(
|
||
allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
|
||
// 分区副本重分配Leader选举策略
|
||
final case object ReassignPartitionLeaderElectionStrategy
|
||
extends PartitionLeaderElectionStrategy
|
||
// 分区Preferred副本Leader选举策略
|
||
final case object PreferredReplicaPartitionLeaderElectionStrategy
|
||
extends PartitionLeaderElectionStrategy
|
||
// Broker Controlled关闭时Leader选举策略
|
||
final case object ControlledShutdownPartitionLeaderElectionStrategy
|
||
extends PartitionLeaderElectionStrategy
|
||
|
||
|
||
当前,分区Leader选举有4类场景。
|
||
|
||
|
||
OfflinePartitionLeaderElectionStrategy:因为Leader副本下线而引发的分区Leader选举。
|
||
ReassignPartitionLeaderElectionStrategy:因为执行分区副本重分配操作而引发的分区Leader选举。
|
||
PreferredReplicaPartitionLeaderElectionStrategy:因为执行Preferred副本Leader选举而引发的分区Leader选举。
|
||
ControlledShutdownPartitionLeaderElectionStrategy:因为正常关闭Broker而引发的分区Leader选举。
|
||
|
||
|
||
PartitionLeaderElectionAlgorithms
|
||
|
||
针对这4类场景,分区状态机的PartitionLeaderElectionAlgorithms对象定义了4个方法,分别负责为每种场景选举Leader副本,这4种方法是:
|
||
|
||
|
||
offlinePartitionLeaderElection;
|
||
reassignPartitionLeaderElection;
|
||
preferredReplicaPartitionLeaderElection;
|
||
controlledShutdownPartitionLeaderElection。
|
||
|
||
|
||
offlinePartitionLeaderElection方法的逻辑是这4个方法中最复杂的,我们就先从它开始学起。
|
||
|
||
def offlinePartitionLeaderElection(assignment: Seq[Int],
|
||
isr: Seq[Int], liveReplicas: Set[Int],
|
||
uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
|
||
// 从当前分区副本列表中寻找首个处于存活状态的ISR副本
|
||
assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
|
||
// 如果找不到满足条件的副本,查看是否允许Unclean Leader选举
|
||
// 即Broker端参数unclean.leader.election.enable是否等于true
|
||
if (uncleanLeaderElectionEnabled) {
|
||
// 选择当前副本列表中的第一个存活副本作为Leader
|
||
val leaderOpt = assignment.find(liveReplicas.contains)
|
||
if (leaderOpt.isDefined)
|
||
controllerContext.stats.uncleanLeaderElectionRate.mark()
|
||
leaderOpt
|
||
} else {
|
||
None // 如果不允许Unclean Leader选举,则返回None表示无法选举Leader
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
我再画一张流程图,帮助你理解它的代码逻辑:
|
||
|
||
|
||
|
||
这个方法总共接收5个参数。除了你已经很熟悉的ControllerContext类,其他4个非常值得我们花一些时间去探究下。
|
||
|
||
1. assignments
|
||
|
||
这是分区的副本列表。该列表有个专属的名称,叫Assigned Replicas,简称AR。当我们创建主题之后,使用kafka-topics脚本查看主题时,应该可以看到名为Replicas的一列数据。这列数据显示的,就是主题下每个分区的AR。assignments参数类型是Seq[Int]。这揭示了一个重要的事实:AR是有顺序的,而且不一定和ISR的顺序相同!
|
||
|
||
2. isr
|
||
|
||
ISR在Kafka中很有名气,它保存了分区所有与Leader副本保持同步的副本列表。注意,Leader副本自己也在ISR中。另外,作为Seq[Int]类型的变量,isr自身也是有顺序的。
|
||
|
||
3. liveReplicas
|
||
|
||
从名字可以推断出,它保存了该分区下所有处于存活状态的副本。怎么判断副本是否存活呢?可以根据Controller元数据缓存中的数据来判定。简单来说,所有在运行中的Broker上的副本,都被认为是存活的。
|
||
|
||
4. uncleanLeaderElectionEnabled
|
||
|
||
在默认配置下,只要不是由AdminClient发起的Leader选举,这个参数的值一般是false,即Kafka不允许执行Unclean Leader选举。所谓的Unclean Leader选举,是指在ISR列表为空的情况下,Kafka选择一个非ISR副本作为新的Leader。由于存在丢失数据的风险,目前,社区已经通过把Broker端参数unclean.leader.election.enable的默认值设置为false的方式,禁止Unclean Leader选举了。
|
||
|
||
值得一提的是,社区于2.4.0.0版本正式支持在AdminClient端为给定分区选举Leader。目前的设计是,如果Leader选举是由AdminClient端触发的,那就默认开启Unclean Leader选举。不过,在学习offlinePartitionLeaderElection方法时,你可以认为uncleanLeaderElectionEnabled=false,这并不会影响你对该方法的理解。
|
||
|
||
了解了这几个参数的含义,我们就可以研究具体的流程了。
|
||
|
||
代码首先会顺序搜索AR列表,并把第一个同时满足以下两个条件的副本作为新的Leader返回:
|
||
|
||
|
||
该副本是存活状态,即副本所在的Broker依然在运行中;
|
||
该副本在ISR列表中。
|
||
|
||
|
||
倘若无法找到这样的副本,代码会检查是否开启了Unclean Leader选举:如果开启了,则降低标准,只要满足上面第一个条件即可;如果未开启,则本次Leader选举失败,没有新Leader被选出。
|
||
|
||
其他3个方法要简单得多,我们直接看代码:
|
||
|
||
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
|
||
reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
|
||
}
|
||
|
||
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
|
||
assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
|
||
}
|
||
|
||
def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
|
||
assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
|
||
}
|
||
|
||
|
||
可以看到,它们的逻辑几乎是相同的,大概的原理都是从AR,或给定的副本列表中寻找存活状态的ISR副本。
|
||
|
||
讲到这里,你应该已经知道Kafka为分区选举Leader的大体思路了。基本上就是,找出AR列表(或给定副本列表)中首个处于存活状态,且在ISR列表的副本,将其作为新Leader。
|
||
|
||
处理分区状态转换的方法
|
||
|
||
掌握了刚刚的这些知识之后,现在,我们正式来看PartitionStateMachine的工作原理。
|
||
|
||
handleStateChanges
|
||
|
||
前面我提到过,handleStateChanges是入口方法,所以我们先看它的方法签名:
|
||
|
||
def handleStateChanges(
|
||
partitions: Seq[TopicPartition],
|
||
targetState: PartitionState,
|
||
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]):
|
||
Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
|
||
|
||
|
||
如果用一句话概括handleStateChanges的作用,应该这样说:handleStateChanges把partitions的状态设置为targetState,同时,还可能需要用leaderElectionStrategy策略为partitions选举新的Leader,最终将partitions的Leader信息返回。
|
||
|
||
其中,partitions是待执行状态变更的目标分区列表,targetState是目标状态,leaderElectionStrategy是一个可选项,如果传入了,就表示要执行Leader选举。
|
||
|
||
下面是handleStateChanges方法的完整代码,我以注释的方式给出了主要的功能说明:
|
||
|
||
override def handleStateChanges(
|
||
partitions: Seq[TopicPartition],
|
||
targetState: PartitionState,
|
||
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
|
||
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
|
||
if (partitions.nonEmpty) {
|
||
try {
|
||
// 清空Controller待发送请求集合,准备本次请求发送
|
||
controllerBrokerRequestBatch.newBatch()
|
||
// 调用doHandleStateChanges方法执行真正的状态变更逻辑
|
||
val result = doHandleStateChanges(
|
||
partitions,
|
||
targetState,
|
||
partitionLeaderElectionStrategyOpt
|
||
)
|
||
// Controller给相关Broker发送请求通知状态变化
|
||
controllerBrokerRequestBatch.sendRequestsToBrokers(
|
||
controllerContext.epoch)
|
||
// 返回状态变更处理结果
|
||
result
|
||
} catch {
|
||
// 如果Controller易主,则记录错误日志,然后重新抛出异常
|
||
// 上层代码会捕获该异常并执行maybeResign方法执行卸任逻辑
|
||
case e: ControllerMovedException =>
|
||
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
|
||
throw e
|
||
// 如果是其他异常,记录错误日志,封装错误返回
|
||
case e: Throwable =>
|
||
error(s"Error while moving some partitions to $targetState state", e)
|
||
partitions.iterator.map(_ -> Left(e)).toMap
|
||
}
|
||
} else { // 如果partitions为空,什么都不用做
|
||
Map.empty
|
||
}
|
||
}
|
||
|
||
|
||
整个方法就两步:第1步是,调用doHandleStateChanges方法执行分区状态转换;第2步是,Controller给相关Broker发送请求,告知它们这些分区的状态变更。至于哪些Broker属于相关Broker,以及给Broker发送哪些请求,实际上是在第1步中被确认的。
|
||
|
||
当然,这个方法的重点,就是第1步中调用的doHandleStateChanges方法。
|
||
|
||
doHandleStateChanges
|
||
|
||
先来看这个方法的实现:
|
||
|
||
private def doHandleStateChanges(
|
||
partitions: Seq[TopicPartition],
|
||
targetState: PartitionState,
|
||
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
|
||
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
|
||
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
|
||
val traceEnabled = stateChangeLog.isTraceEnabled
|
||
// 初始化新分区的状态为NonExistentPartition
|
||
partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
|
||
// 找出要执行非法状态转换的分区,记录错误日志
|
||
val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
|
||
invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
|
||
// 根据targetState进入到不同的case分支
|
||
targetState match {
|
||
......
|
||
}
|
||
}
|
||
|
||
|
||
这个方法首先会做状态初始化的工作,具体来说就是,在方法调用时,不在元数据缓存中的所有分区的状态,会被初始化为NonExistentPartition。
|
||
|
||
接着,检查哪些分区执行的状态转换不合法,并为这些分区记录相应的错误日志。
|
||
|
||
之后,代码携合法状态转换的分区列表进入到case分支。由于分区状态只有4个,因此,它的case分支代码远比ReplicaStateMachine中的简单,而且,只有OnlinePartition这一路的分支逻辑相对复杂,其他3路仅仅是将分区状态设置成目标状态而已,
|
||
|
||
所以,我们来深入研究下目标状态是OnlinePartition的分支:
|
||
|
||
case OnlinePartition =>
|
||
// 获取未初始化分区列表,也就是NewPartition状态下的所有分区
|
||
val uninitializedPartitions = validPartitions.filter(
|
||
partition => partitionState(partition) == NewPartition)
|
||
// 获取具备Leader选举资格的分区列表
|
||
// 只能为OnlinePartition和OfflinePartition状态的分区选举Leader
|
||
val partitionsToElectLeader = validPartitions.filter(
|
||
partition => partitionState(partition) == OfflinePartition ||
|
||
partitionState(partition) == OnlinePartition)
|
||
// 初始化NewPartition状态分区,在ZooKeeper中写入Leader和ISR数据
|
||
if (uninitializedPartitions.nonEmpty) {
|
||
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
|
||
successfulInitializations.foreach { partition =>
|
||
stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
|
||
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
|
||
controllerContext.putPartitionState(partition, OnlinePartition)
|
||
}
|
||
}
|
||
// 为具备Leader选举资格的分区推选Leader
|
||
if (partitionsToElectLeader.nonEmpty) {
|
||
val electionResults = electLeaderForPartitions(
|
||
partitionsToElectLeader,
|
||
partitionLeaderElectionStrategyOpt.getOrElse(
|
||
throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
|
||
)
|
||
)
|
||
electionResults.foreach {
|
||
case (partition, Right(leaderAndIsr)) =>
|
||
stateChangeLog.info(
|
||
s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
|
||
)
|
||
// 将成功选举Leader后的分区设置成OnlinePartition状态
|
||
controllerContext.putPartitionState(
|
||
partition, OnlinePartition)
|
||
case (_, Left(_)) => // 如果选举失败,忽略之
|
||
}
|
||
// 返回Leader选举结果
|
||
electionResults
|
||
} else {
|
||
Map.empty
|
||
}
|
||
|
||
|
||
虽然代码有点长,但总的步骤就两步。
|
||
|
||
第1步是为NewPartition状态的分区做初始化操作,也就是在ZooKeeper中,创建并写入分区节点数据。节点的位置是/brokers/topics/<topic>/partitions/<partition>,每个节点都要包含分区的Leader和ISR等数据。而Leader和ISR的确定规则是:选择存活副本列表的第一个副本作为Leader;选择存活副本列表作为ISR。至于具体的代码,可以看下initializeLeaderAndIsrForPartitions方法代码片段的倒数第5行:
|
||
|
||
private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
|
||
......
|
||
// 获取每个分区的副本列表
|
||
val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
|
||
// 获取每个分区的所有存活副本
|
||
val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
|
||
val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
|
||
partition -> liveReplicasForPartition
|
||
}
|
||
// 按照有无存活副本对分区进行分组
|
||
// 分为两组:有存活副本的分区;无任何存活副本的分区
|
||
val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
|
||
......
|
||
// 为"有存活副本的分区"确定Leader和ISR
|
||
// Leader确认依据:存活副本列表的首个副本被认定为Leader
|
||
// ISR确认依据:存活副本列表被认定为ISR
|
||
val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
|
||
val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
|
||
......
|
||
}.toMap
|
||
......
|
||
}
|
||
|
||
|
||
第2步是为具备Leader选举资格的分区推选Leader,代码调用electLeaderForPartitions方法实现。这个方法会不断尝试为多个分区选举Leader,直到所有分区都成功选出Leader。
|
||
|
||
选举Leader的核心代码位于doElectLeaderForPartitions方法中,该方法主要有3步。
|
||
|
||
代码很长,我先画一张图来展示它的主要步骤,然后再分步骤给你解释每一步的代码,以免你直接陷入冗长的源码行里面去。
|
||
|
||
|
||
|
||
看着好像图也很长,别着急,我们来一步步拆解下。
|
||
|
||
就像前面说的,这个方法大体分为3步。第1步是从ZooKeeper中获取给定分区的Leader、ISR信息,并将结果封装进名为validLeaderAndIsrs的容器中,代码如下:
|
||
|
||
// doElectLeaderForPartitions方法的第1部分
|
||
val getDataResponses = try {
|
||
// 批量获取ZooKeeper中给定分区的znode节点数据
|
||
zkClient.getTopicPartitionStatesRaw(partitions)
|
||
} catch {
|
||
case e: Exception =>
|
||
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
|
||
}
|
||
// 构建两个容器,分别保存可选举Leader分区列表和选举失败分区列表
|
||
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
|
||
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
|
||
// 遍历每个分区的znode节点数据
|
||
getDataResponses.foreach { getDataResponse =>
|
||
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
|
||
val currState = partitionState(partition)
|
||
// 如果成功拿到znode节点数据
|
||
if (getDataResponse.resultCode == Code.OK) {
|
||
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
|
||
// 节点数据中含Leader和ISR信息
|
||
case Some(leaderIsrAndControllerEpoch) =>
|
||
// 如果节点数据的Controller Epoch值大于当前Controller Epoch值
|
||
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
|
||
val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
|
||
s"already written by another controller. This probably means that the current controller $controllerId went through " +
|
||
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
|
||
// 将该分区加入到选举失败分区列表
|
||
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
|
||
} else {
|
||
// 将该分区加入到可选举Leader分区列表
|
||
validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
|
||
}
|
||
// 如果节点数据不含Leader和ISR信息
|
||
case None =>
|
||
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
|
||
// 将该分区加入到选举失败分区列表
|
||
failedElections.put(partition, Left(exception))
|
||
}
|
||
// 如果没有拿到znode节点数据,则将该分区加入到选举失败分区列表
|
||
} else if (getDataResponse.resultCode == Code.NONODE) {
|
||
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
|
||
failedElections.put(partition, Left(exception))
|
||
} else {
|
||
failedElections.put(partition, Left(getDataResponse.resultException.get))
|
||
}
|
||
}
|
||
|
||
if (validLeaderAndIsrs.isEmpty) {
|
||
return (failedElections.toMap, Seq.empty)
|
||
}
|
||
|
||
|
||
首先,代码会批量读取ZooKeeper中给定分区的所有Znode数据。之后,会构建两个容器,分别保存可选举Leader分区列表和选举失败分区列表。接着,开始遍历每个分区的Znode节点数据,如果成功拿到Znode节点数据,节点数据包含Leader和ISR信息且节点数据的Controller Epoch值小于当前Controller Epoch值,那么,就将该分区加入到可选举Leader分区列表。倘若发现Zookeeper中保存的Controller Epoch值大于当前Epoch值,说明该分区已经被一个更新的Controller选举过Leader了,此时必须终止本次Leader选举,并将该分区放置到选举失败分区列表中。
|
||
|
||
遍历完这些分区之后,代码要看下validLeaderAndIsrs容器中是否包含可选举Leader的分区。如果一个满足选举Leader的分区都没有,方法直接返回。至此,doElectLeaderForPartitions方法的第一大步完成。
|
||
|
||
下面,我们看下该方法的第2部分代码:
|
||
|
||
// doElectLeaderForPartitions方法的第2部分
|
||
// 开始选举Leader,并根据有无Leader将分区进行分区
|
||
val (partitionsWithoutLeaders, partitionsWithLeaders) =
|
||
partitionLeaderElectionStrategy match {
|
||
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
|
||
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
|
||
validLeaderAndIsrs,
|
||
allowUnclean
|
||
)
|
||
// 为OffinePartition分区选举Leader
|
||
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
|
||
case ReassignPartitionLeaderElectionStrategy =>
|
||
// 为副本重分配的分区选举Leader
|
||
leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
|
||
case PreferredReplicaPartitionLeaderElectionStrategy =>
|
||
// 为分区执行Preferred副本Leader选举
|
||
leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
|
||
case ControlledShutdownPartitionLeaderElectionStrategy =>
|
||
// 为因Broker正常关闭而受影响的分区选举Leader
|
||
leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
|
||
}
|
||
|
||
|
||
这一步是根据给定的PartitionLeaderElectionStrategy,调用PartitionLeaderElectionAlgorithms的不同方法执行Leader选举,同时,区分出成功选举Leader和未选出Leader的分区。
|
||
|
||
前面说过了,这4种不同的策略定义了4个专属的方法来进行Leader选举。其实,如果你打开这些方法的源码,就会发现它们大同小异。基本上,选择Leader的规则,就是选择副本集合中首个存活且处于ISR中的副本作为Leader。
|
||
|
||
现在,我们再来看这个方法的最后一部分代码,这一步主要是更新ZooKeeper节点数据,以及Controller端元数据缓存信息。
|
||
|
||
// doElectLeaderForPartitions方法的第3部分
|
||
// 将所有选举失败的分区全部加入到Leader选举失败分区列表
|
||
partitionsWithoutLeaders.foreach { electionResult =>
|
||
val partition = electionResult.topicPartition
|
||
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
|
||
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
|
||
}
|
||
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
|
||
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
|
||
// 使用新选举的Leader和ISR信息更新ZooKeeper上分区的znode节点数据
|
||
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
|
||
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
|
||
// 对于ZooKeeper znode节点数据更新成功的分区,封装对应的Leader和ISR信息
|
||
// 构建LeaderAndIsr请求,并将该请求加入到Controller待发送请求集合
|
||
// 等待后续统一发送
|
||
finishedUpdates.foreach { case (partition, result) =>
|
||
result.foreach { leaderAndIsr =>
|
||
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
|
||
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
|
||
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
|
||
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
|
||
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
|
||
}
|
||
}
|
||
// 返回选举结果,包括成功选举并更新ZooKeeper节点的分区、选举失败分区以及
|
||
// ZooKeeper节点更新失败的分区
|
||
(finishedUpdates ++ failedElections, updatesToRetry)
|
||
|
||
|
||
首先,将上一步中所有选举失败的分区,全部加入到Leader选举失败分区列表。
|
||
|
||
然后,使用新选举的Leader和ISR信息,更新ZooKeeper上分区的Znode节点数据。对于ZooKeeper Znode节点数据更新成功的那些分区,源码会封装对应的Leader和ISR信息,构建LeaderAndIsr请求,并将该请求加入到Controller待发送请求集合,等待后续统一发送。
|
||
|
||
最后,方法返回选举结果,包括成功选举并更新ZooKeeper节点的分区列表、选举失败分区列表,以及ZooKeeper节点更新失败的分区列表。
|
||
|
||
这会儿,你还记得handleStateChanges方法的第2步是Controller给相关的Broker发送请求吗?那么,到底要给哪些Broker发送哪些请求呢?其实就是在上面这步完成的,即这行语句:
|
||
|
||
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
|
||
recipientsPerPartition(partition), partition,
|
||
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
|
||
|
||
|
||
总结
|
||
|
||
今天,我们重点学习了PartitionStateMachine.scala文件的源码,主要是研究了Kafka分区状态机的构造原理和工作机制。
|
||
|
||
学到这里,我们再来回答开篇面试官的问题,应该就不是什么难事了。现在我们知道了,Kafka目前提供4种Leader选举策略,分别是分区下线后的Leader选举、分区执行副本重分配时的Leader选举、分区执行Preferred副本Leader选举,以及Broker下线时的分区Leader选举。
|
||
|
||
这4类选举策略在选择Leader这件事情上有着类似的逻辑,那就是,它们几乎都是选择当前副本有序集合中的、首个处于ISR集合中的存活副本作为新的Leader。当然,个别选举策略可能会有细小的差别,你可以结合我们今天学到的源码,课下再深入地研究一下每一类策略的源码。
|
||
|
||
我们来回顾下这节课的重点。
|
||
|
||
|
||
PartitionStateMachine是Kafka Controller端定义的分区状态机,负责定义、维护和管理合法的分区状态转换。
|
||
每个Broker启动时都会实例化一个分区状态机对象,但只有Controller所在的Broker才会启动它。
|
||
Kafka分区有4类状态,分别是NewPartition、OnlinePartition、OfflinePartition和NonExistentPartition。其中OnlinPartition是分区正常工作时的状态。NewPartition是未初始化状态,处于该状态下的分区尚不具备选举Leader的资格。
|
||
Leader选举有4类场景,分别是Offline、Reassign、Preferrer Leader Election和ControlledShutdown。每类场景都对应于一种特定的Leader选举策略。
|
||
handleStateChanges方法是主要的入口方法,下面调用doHandleStateChanges私有方法实现实际的Leader选举功能。
|
||
|
||
|
||
下个模块,我们将来到Kafka延迟操作代码的世界。在那里,你能了解Kafka是如何实现一个延迟请求的处理的。另外,一个O(N)时间复杂度的时间轮算法也等候在那里,到时候我们一起研究下它!
|
||
|
||
课后讨论
|
||
|
||
源码中有个triggerOnlineStateChangeForPartitions方法,请你分析下,它是做什么用的,以及它何时被调用?
|
||
|
||
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。
|
||
|
||
|
||
|
||
|