modify
This commit is contained in:
parent
ca9ccf5441
commit
aeb77a7b92
@ -124,12 +124,11 @@ TODO
|
||||
|
||||
## 七、Kafka
|
||||
|
||||
1. [Kafka 核心概念介绍](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka核心概念介绍.md)
|
||||
1. [Kafka 简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka简介.md)
|
||||
2. [基于Zookeeper搭建Kafka高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md)
|
||||
3. [Kafka生产者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka生产者详解.md)
|
||||
4. [Kafka消费者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka消费者详解.md)
|
||||
5. Kafka 副本机制以及选举原理剖析
|
||||
6. Kafka的数据可靠性
|
||||
3. [Kafka 生产者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka生产者详解.md)
|
||||
4. [Kafka 消费者详解](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka消费者详解.md)
|
||||
5. [深入理解Kafka副本机制](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka深入理解分区副本机制.md)
|
||||
|
||||
## 八、Zookeeper
|
||||
|
||||
|
@ -27,6 +27,11 @@
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.2.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.12</artifactId>
|
||||
<version>2.2.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-nop</artifactId>
|
||||
|
@ -79,7 +79,7 @@ try {
|
||||
}
|
||||
```
|
||||
|
||||
> 本片文章的所有示例代码可以从Github上进行下载:[kafka-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/Kafka/kafka-basis)
|
||||
> 本篇文章的所有示例代码可以从Github上进行下载:[kafka-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/Kafka/kafka-basis)
|
||||
|
||||
## 三、 自动提交偏移量
|
||||
|
||||
|
161
notes/Kafka深入理解分区副本机制.md
Normal file
161
notes/Kafka深入理解分区副本机制.md
Normal file
@ -0,0 +1,161 @@
|
||||
# 深入理解Kafka副本机制
|
||||
|
||||
<nav>
|
||||
<a href="#一Kafka集群">一、Kafka集群</a><br/>
|
||||
<a href="#二副本机制">二、副本机制</a><br/>
|
||||
<a href="#21-分区和副本">2.1 分区和副本</a><br/>
|
||||
<a href="#22-ISR机制">2.2 ISR机制</a><br/>
|
||||
<a href="#23-不完全的首领选举">2.3 不完全的首领选举</a><br/>
|
||||
<a href="#24-最少同步副本">2.4 最少同步副本</a><br/>
|
||||
<a href="#25-发送确认">2.5 发送确认</a><br/>
|
||||
<a href="#三数据请求">三、数据请求</a><br/>
|
||||
<a href="#31-元数据请求机制">3.1 元数据请求机制</a><br/>
|
||||
<a href="#32-数据可见性">3.2 数据可见性</a><br/>
|
||||
<a href="#33-零拷贝">3.3 零拷贝</a><br/>
|
||||
<a href="#四物理存储">四、物理存储</a><br/>
|
||||
<a href="#41-分区分配">4.1 分区分配</a><br/>
|
||||
<a href="#42-分区数据保留规则">4.2 分区数据保留规则</a><br/>
|
||||
<a href="#43-文件格式">4.3 文件格式</a><br/>
|
||||
<a href="#"></a><br/>
|
||||
</nav>
|
||||
|
||||
|
||||
## 一、Kafka集群
|
||||
|
||||
Kafka使用Zookeeper来维护集群成员(brokers)的信息。每个broker都有一个唯一标识`broker.id`,用于标识自己在集群中的身份,可以在配置文件`server.properties`中进行配置,或者由程序自动生成。下面是Kafka brokers集群自动创建的过程:
|
||||
|
||||
+ 每一个broker启动的时候,它会在Zookeeper的`/brokers/ids`路径下创建一个`临时节点`,并将自己的`broker.id`写入,从而将自身注册到集群;
|
||||
+ 当有多个broker时,所有broker会竞争性地在Zookeeper上创建`/controller`节点,由于Zookeeper上的节点不会重复,所以必然只会有一个broker创建成功,此时该broker称为controller broker。它除了具备其他broker的功能外,**还负责管理主题分区及其副本的状态**。
|
||||
+ 当broker出现宕机或者主动退出从而导致其持有的Zookeeper会话超时时,会触发注册在Zookeeper上的watcher事件,此时Kafka会进行相应的容错处理;如果宕机的是controller broker时,还会触发新的controller选举。
|
||||
|
||||
## 二、副本机制
|
||||
|
||||
为了保证高可用,kafka的分区是多副本的,如果一个副本丢失了,那么还可以从其他副本中获取分区数据。但是这要求对应副本的数据必须是完整的,这是Kafka数据一致性的基础,所以才需要使用`controller broker`来进行专门的管理。下面将详解介绍Kafka的副本机制。
|
||||
|
||||
### 2.1 分区和副本
|
||||
|
||||
Kafka 的主题被分为多个分区 ,分区是Kafka最基本的存储单位。每个分区可以有多个副本(可以在创建主题时使用` replication-factor`参数进行指定)。其中一个副本是首领副本(Leader replica),所有的事件都直接发送给首领副本;其他副本是跟随者副本(Follower replica),需要通过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-cluster.png"/> </div>
|
||||
|
||||
### 2.2 ISR机制
|
||||
|
||||
每个分区都有一个ISR(in-sync Replica)列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:
|
||||
|
||||
+ 与Zookeeper之间有一个活跃的会话,即必须定时向Zookeeper发送心跳;
|
||||
+ 在规定的时间内从首领副本那里低延迟地获取过消息。
|
||||
|
||||
如果副本不满足上面条件的话,就会被从ISR列表中移除,直到满足条件才会被再次加入。
|
||||
|
||||
这里给出一个主题创建的示例:使用`--replication-factor`指定副本系数为3,创建成功后使用`--describe `命令可以看到分区0的有0,1,2三个副本,且三个副本都在ISR列表中,其中1为首领副本。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-分区副本.png"/> </div>
|
||||
|
||||
### 2.3 不完全的首领选举
|
||||
|
||||
对于副本机制,在broker级别有一个可选的配置参数`unclean.leader.election.enable`,默认值为fasle,代表禁止不完全的首领选举。这是针对当首领副本挂掉且ISR中没有其他可用副本时,是否允许某个不完全同步的副本成为首领副本,这可以会导致数据丢失或者数据不一致,在某些对数据一致性要求较高的场景(如金融领域),这可能无法容忍的,所以其默认值为false,如果你能够允许部分数据不一致的话,可以配置为true。
|
||||
|
||||
### 2.4 最少同步副本
|
||||
|
||||
ISR机制的另外一个相关参数是`min.insync.replicas` , 可以在broker或者主题级别进行配置,代表ISR列表中至少要有几个可用副本。这里假设设置为2,那么当可用副本数量小于该值时,就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常`org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。`
|
||||
|
||||
### 2.5 发送确认
|
||||
|
||||
Kafka在生产者上有一个可选的参数ack,该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
|
||||
|
||||
- acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
|
||||
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
|
||||
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
|
||||
## 三、数据请求
|
||||
|
||||
### 3.1 元数据请求机制
|
||||
|
||||
在所有副本中,只有领导副本才能进行消息的读写处理。由于不同分区的领导副本可能在不同的broker上,如果某个broker收到了一个分区请求,但是该分区的领导副本并不在该broker上,那么它就会向客户端返回一个`Not a Leader for Partition`的错误响应。 为了解决这个问题,Kafka提供了元数据请求机制。
|
||||
|
||||
首先集群中的每个broker都会缓存所有主题的分区副本信息,客户端会定期发送发送元数据请求,然后将获取的元数据进行缓存。定时刷新元数据的时间间隔可以通过为客户端配置`metadata.max.age.ms`来进行指定。
|
||||
|
||||
如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时有可能会收到`Not a Leader for Partition`的错误响应,这种情况下客户端会再次求发出元数据请求,然后刷新本地缓存,之后再去正确的broker上执行对应的操作,过程如下图:
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-元数据请求.png"/> </div>
|
||||
|
||||
### 3.2 数据可见性
|
||||
|
||||
需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本(ISR中所有副本)都保存了的数据才能被客户端读取到。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-数据可见性.png"/> </div>
|
||||
|
||||
### 3.3 零拷贝
|
||||
|
||||
Kafka所有数据的写入和读取都是通过零拷贝来实现的。传统拷贝与零拷贝的区别如下:
|
||||
|
||||
#### 传统模式下的四次拷贝与四次上下文切换
|
||||
|
||||
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过Socket将内存中的数据发送出去。
|
||||
|
||||
```java
|
||||
buffer = File.read
|
||||
Socket.send(buffer)
|
||||
```
|
||||
|
||||
这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态Buffer(DMA拷贝),然后应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝),接着用户程序通过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后通过DMA拷贝将数据拷贝到NIC Buffer。同时,还伴随着四次上下文切换,如下图所示:
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-BIO.png"/> </div>
|
||||
|
||||
#### sendfile和transferTo实现零拷贝
|
||||
|
||||
Linux 2.4+内核通过`sendfile`系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件到网络发送由一个`sendfile`调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示:
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-零拷贝.png"/> </div>
|
||||
|
||||
从具体实现来看,Kafka的数据传输通过TransportLayer来完成,其子类`PlaintextTransportLayer`的`transferFrom`方法通过调用Java NIO中FileChannel的`transferTo`方法实现零拷贝,如下所示:
|
||||
|
||||
```java
|
||||
@Override
|
||||
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
|
||||
return fileChannel.transferTo(position, count, socketChannel);
|
||||
}
|
||||
```
|
||||
|
||||
**注:** `transferTo`和`transferFrom`并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供`sendfile`这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。
|
||||
|
||||
## 四、物理存储
|
||||
|
||||
### 4.1 分区分配
|
||||
|
||||
在创建主题时,Kafka会首先决定如何在broker间分配分区副本,它遵循以下原则:
|
||||
|
||||
+ 在所有broker上均匀地分配分区副本;
|
||||
+ 确保分区的每个副本分布在不同的broker上;
|
||||
+ 如果使用了`broker.rack`参数为broker指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的broker上,以避免一个机架不可用而导致整个分区不可用。
|
||||
|
||||
基于以上原因,如果你在一个单节点上创建一个3副本的主题,通常会抛出下面的异常:
|
||||
|
||||
```properties
|
||||
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor Exception: Replication factor: 3 larger than available brokers: 1.
|
||||
```
|
||||
|
||||
### 4.2 分区数据保留规则
|
||||
|
||||
保留数据是 Kafka 的一个基本特性, 但是Kafka不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反, Kafka为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。分别对应以下四个参数:
|
||||
|
||||
- `log.retention.bytes` :删除数据前允许的最大数据量;默认值-1,代表没有限制;
|
||||
- `log.retention.ms`:保存数据文件的毫秒数,如果未设置,则使用`log.retention.minutes`中的值,默认为null;
|
||||
- `log.retention.minutes`:保留数据文件的分钟数,如果未设置,则使用`log.retention.hours`中的值,默认为null;
|
||||
- `log.retention.hours`:保留数据文件的小时数,默认值为168,也就是一周。
|
||||
|
||||
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以Kafka把分区分成若干个片段,当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除。如果按照默认值保留数据一周,而且每天使用一个新片段,那么你就会看到,在每天使用一个新片段的同时会删除一个最老的片段,所以大部分时间该分区会有7个片段存在。
|
||||
|
||||
### 4.3 文件格式
|
||||
|
||||
通常保存在磁盘上的数据格式与生产者发送过来消息格式是一样的。 如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送(格式如下所示) ,然后保存到磁盘上。之后消费者读取后再自己解压这个包装消息,获取每条消息的具体信息。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-compress-message.png"/> </div>
|
||||
|
||||
###
|
||||
|
||||
## 参考资料
|
||||
|
||||
1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命灯(译) . Kafka权威指南 . 人民邮电出版社 . 2017-12-26
|
||||
|
||||
2. [Kafka高性能架构之道](http://www.jasongj.com/kafka/high_throughput/)
|
@ -73,7 +73,7 @@ public class SimpleProducer {
|
||||
}
|
||||
```
|
||||
|
||||
> 本片文章的所有示例代码可以从Github上进行下载:[kafka-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/Kafka/kafka-basis)
|
||||
> 本篇文章的所有示例代码可以从Github上进行下载:[kafka-basis](https://github.com/heibaiying/BigData-Notes/tree/master/code/Kafka/kafka-basis)
|
||||
|
||||
### 2.3 测试
|
||||
|
||||
@ -124,6 +124,17 @@ bin/kafka-topics.sh --create \
|
||||
|
||||
|
||||
|
||||
### 2.4 可能出现的问题
|
||||
|
||||
在这里可能出现的一个问题是:即使你在程序中配置的服务器地址是完全正确的,但是生产者程序在启动后,却一直处于等待状态,然后抛出连接超时的异常。这通常出现在你使用默认配置启动Kafka的情况下,这时候只需要对`server.properties`中`listeners`配置进行如下修改即可:
|
||||
|
||||
```shell
|
||||
# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
|
||||
listeners=PLAINTEXT://hadoop001:9092
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 二、发送消息
|
||||
|
||||
上面的示例程序调用了`send`方法发送消息后没有做任何操作,在这种情况下,我们是没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。
|
||||
@ -296,7 +307,7 @@ score:5, partition=0,
|
||||
|
||||
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
|
||||
|
||||
- acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应;
|
||||
- acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
|
||||
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
|
||||
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Kafka核心概念介绍
|
||||
# Kafka简介
|
||||
|
||||
<nav>
|
||||
<a href="#一Kafka简介">一、Kafka简介</a><br/>
|
||||
@ -10,7 +10,7 @@
|
||||
</nav>
|
||||
|
||||
|
||||
## 一、Kafka简介
|
||||
## 一、简介
|
||||
|
||||
ApacheKafka是一个分布式的流处理平台。它具有以下特点:
|
||||
|
||||
@ -20,7 +20,7 @@ ApacheKafka是一个分布式的流处理平台。它具有以下特点:
|
||||
+ 高吞吐率,单broker可以轻松处理数千个分区以及每秒百万级的消息量;
|
||||
+ 能保证消息的可靠性投递。
|
||||
|
||||
## 二、Kafka核心概念
|
||||
## 二、基本概念
|
||||
|
||||
### 2.1 Messages And Batches
|
||||
|
@ -1,5 +1,6 @@
|
||||
# 基于Zookeeper搭建Kafka高可用集群
|
||||
<nav>
|
||||
# 基于Zookeeper搭建Kafka高可用集群
|
||||
|
||||
<nav>
|
||||
<a href="#一Zookeeper集群搭建">一、Zookeeper集群搭建</a><br/>
|
||||
<a href="#11-下载--解压">1.1 下载 & 解压</a><br/>
|
||||
<a href="#12-修改配置">1.2 修改配置</a><br/>
|
||||
@ -14,227 +15,223 @@
|
||||
<a href="#25-创建测试主题">2.5 创建测试主题</a><br/>
|
||||
</nav>
|
||||
|
||||
## 一、Zookeeper集群搭建
|
||||
|
||||
为保证集群高可用,Zookeeper集群的节点数最好是奇数,最少有三个节点,所以这里搭建一个三个节点的集群。
|
||||
|
||||
### 1.1 下载 & 解压
|
||||
|
||||
下载对应版本Zookeeper,这里我下载的版本`3.4.14`。官方下载地址:https://archive.apache.org/dist/zookeeper/
|
||||
|
||||
```shell
|
||||
# 下载
|
||||
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
|
||||
# 解压
|
||||
tar -zxvf zookeeper-3.4.14.tar.gz
|
||||
```
|
||||
|
||||
### 1.2 修改配置
|
||||
|
||||
拷贝三份zookeeper安装包。分别进入安装目录的`conf`目录,拷贝配置样本`zoo_sample.cfg `为`zoo.cfg`并进行修改,修改后三份配置文件内容分别如下:
|
||||
|
||||
zookeeper01配置:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/01
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/01
|
||||
clientPort=2181
|
||||
|
||||
# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
|
||||
# 指名集群间通讯端口和选举端口
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
> 如果是多台服务器,则集群中每个节点通讯端口和选举端口可相同,IP地址修改为每个节点所在主机IP即可。
|
||||
|
||||
zookeeper02配置,与zookeeper01相比,只有`dataLogDir`和`dataLogDir`不同:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/02
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/02
|
||||
clientPort=2182
|
||||
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
zookeeper03配置,与zookeeper01,02相比,也只有`dataLogDir`和`dataLogDir`不同:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/03
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/03
|
||||
clientPort=2183
|
||||
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
> 配置参数说明:
|
||||
>
|
||||
> - **tickTime**:用于计算的基础时间单元。比如session超时:N*tickTime;
|
||||
> - **initLimit**:用于集群,允许从节点连接并同步到 master节点的初始化连接时间,以tickTime的倍数来表示;
|
||||
> - **syncLimit**:用于集群, master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
|
||||
> - **dataDir**:数据存储位置;
|
||||
> - **dataLogDir**:日志目录;
|
||||
> - **clientPort**:用于客户端连接的端口,默认2181
|
||||
|
||||
|
||||
|
||||
### 1.3 标识节点
|
||||
|
||||
分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出leader节点。
|
||||
|
||||
创建存储目录:
|
||||
|
||||
```shell
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/01
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/02
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/03
|
||||
```
|
||||
|
||||
创建并写入节点标识到`myid`文件:
|
||||
|
||||
```shell
|
||||
#server1
|
||||
echo "1" > /usr/local/zookeeper-cluster/data/01/myid
|
||||
#server2
|
||||
echo "2" > /usr/local/zookeeper-cluster/data/02/myid
|
||||
#server3
|
||||
echo "3" > /usr/local/zookeeper-cluster/data/03/myid
|
||||
```
|
||||
|
||||
### 1.4 启动集群
|
||||
|
||||
分别启动三个节点:
|
||||
|
||||
```shell
|
||||
# 启动节点1
|
||||
/usr/app/zookeeper-cluster/zookeeper01/bin/zkServer.sh start
|
||||
# 启动节点2
|
||||
/usr/app/zookeeper-cluster/zookeeper02/bin/zkServer.sh start
|
||||
# 启动节点3
|
||||
/usr/app/zookeeper-cluster/zookeeper03/bin/zkServer.sh start
|
||||
```
|
||||
|
||||
### 1.5 集群验证
|
||||
|
||||
使用jps查看进程,并且使用`zkServer.sh status`查看集群各个节点状态。如图三个节点进程均启动成功,并且两个节点为follower节点,一个节点为leader节点。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-cluster.png"/> </div>
|
||||
|
||||
|
||||
|
||||
## 二、Kafka集群搭建
|
||||
|
||||
### 2.1 下载解压
|
||||
|
||||
Kafka安装包官方下载地址:http://kafka.apache.org/downloads ,本用例下载的版本为`2.2.0`,下载命令:
|
||||
|
||||
```shell
|
||||
# 下载
|
||||
wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
|
||||
# 解压
|
||||
tar -xzf kafka_2.12-2.2.0.tgz
|
||||
```
|
||||
|
||||
>这里j解释一下kafka安装包的命名规则:以`kafka_2.12-2.2.0.tgz`为例,前面的2.12代表Scala的版本号(Kafka采用Scala语言进行开发),后面的2.2.0则代表Kafka的版本号。
|
||||
|
||||
### 2.2 拷贝配置文件
|
||||
|
||||
进入解压目录的` config`目录下 ,拷贝三份配置文件
|
||||
|
||||
```shell
|
||||
# cp server.properties server-1.properties
|
||||
# cp server.properties server-2.properties
|
||||
# cp server.properties server-3.properties
|
||||
```
|
||||
|
||||
### 2.3 修改配置
|
||||
|
||||
分别修改三份配置文件中的部分配置,如下:
|
||||
|
||||
server-1.properties:
|
||||
|
||||
```properties
|
||||
# The id of the broker. 集群中每个节点的唯一标识
|
||||
broker.id=0
|
||||
# 监听地址
|
||||
listeners=PLAINTEXT://hadoop001:9092
|
||||
# 日志文件存放位置
|
||||
log.dirs=/usr/local/kafka-logs/00
|
||||
# Zookeeper连接地址
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
server-2.properties:
|
||||
|
||||
```properties
|
||||
broker.id=1
|
||||
listeners=PLAINTEXT://hadoop001:9093
|
||||
log.dirs=/usr/local/kafka-logs/01
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
server-3.properties:
|
||||
|
||||
```properties
|
||||
broker.id=2
|
||||
listeners=PLAINTEXT://hadoop001:9094
|
||||
log.dirs=/usr/local/kafka-logs/02
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
### 2.4 启动集群
|
||||
|
||||
分别指定不同配置文件,启动三个Kafka节点。启动后可以使用jps查看进程,此时应该有三个zookeeper进程和三个kafka进程。
|
||||
|
||||
```shell
|
||||
bin/kafka-server-start.sh config/server-1.properties
|
||||
bin/kafka-server-start.sh config/server-2.properties
|
||||
bin/kafka-server-start.sh config/server-3.properties
|
||||
```
|
||||
|
||||
### 2.5 创建测试主题
|
||||
|
||||
创建测试主题:
|
||||
|
||||
```shell
|
||||
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
||||
```
|
||||
|
||||
创建后可以使用以下命令查看创建的主题信息:
|
||||
|
||||
```shell
|
||||
bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
|
||||
```
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-cluster-shell.png"/> </div>
|
||||
|
||||
你也可以创建一个消费者和生产者进行连通测试:
|
||||
|
||||
```shell
|
||||
# 创建生产者
|
||||
bin/kafka-console-producer.sh --broker-list hadoop001:9093 --topic my-replicated-topic
|
||||
```
|
||||
|
||||
```shell
|
||||
# 创建消费者
|
||||
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9094 --from-beginning --topic my-replicated-topic
|
||||
```
|
||||
|
||||
## 一、Zookeeper集群搭建
|
||||
|
||||
为保证集群高可用,Zookeeper集群的节点数最好是奇数,最少有三个节点,所以这里搭建一个三个节点的集群。
|
||||
|
||||
### 1.1 下载 & 解压
|
||||
|
||||
下载对应版本Zookeeper,这里我下载的版本`3.4.14`。官方下载地址:https://archive.apache.org/dist/zookeeper/
|
||||
|
||||
```shell
|
||||
# 下载
|
||||
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
|
||||
# 解压
|
||||
tar -zxvf zookeeper-3.4.14.tar.gz
|
||||
```
|
||||
|
||||
### 1.2 修改配置
|
||||
|
||||
拷贝三份zookeeper安装包。分别进入安装目录的`conf`目录,拷贝配置样本`zoo_sample.cfg `为`zoo.cfg`并进行修改,修改后三份配置文件内容分别如下:
|
||||
|
||||
zookeeper01配置:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/01
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/01
|
||||
clientPort=2181
|
||||
|
||||
# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
|
||||
# 指名集群间通讯端口和选举端口
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
> 如果是多台服务器,则集群中每个节点通讯端口和选举端口可相同,IP地址修改为每个节点所在主机IP即可。
|
||||
|
||||
zookeeper02配置,与zookeeper01相比,只有`dataLogDir`和`dataLogDir`不同:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/02
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/02
|
||||
clientPort=2182
|
||||
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
zookeeper03配置,与zookeeper01,02相比,也只有`dataLogDir`和`dataLogDir`不同:
|
||||
|
||||
```shell
|
||||
tickTime=2000
|
||||
initLimit=10
|
||||
syncLimit=5
|
||||
dataDir=/usr/local/zookeeper-cluster/data/03
|
||||
dataLogDir=/usr/local/zookeeper-cluster/log/03
|
||||
clientPort=2183
|
||||
|
||||
server.1=127.0.0.1:2287:3387
|
||||
server.2=127.0.0.1:2288:3388
|
||||
server.3=127.0.0.1:2289:3389
|
||||
```
|
||||
|
||||
> 配置参数说明:
|
||||
>
|
||||
> - **tickTime**:用于计算的基础时间单元。比如session超时:N*tickTime;
|
||||
> - **initLimit**:用于集群,允许从节点连接并同步到 master节点的初始化连接时间,以tickTime的倍数来表示;
|
||||
> - **syncLimit**:用于集群, master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
|
||||
> - **dataDir**:数据存储位置;
|
||||
> - **dataLogDir**:日志目录;
|
||||
> - **clientPort**:用于客户端连接的端口,默认2181
|
||||
|
||||
|
||||
|
||||
### 1.3 标识节点
|
||||
|
||||
分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出leader节点。
|
||||
|
||||
创建存储目录:
|
||||
|
||||
```shell
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/01
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/02
|
||||
# dataDir
|
||||
mkdir -vp /usr/local/zookeeper-cluster/data/03
|
||||
```
|
||||
|
||||
创建并写入节点标识到`myid`文件:
|
||||
|
||||
```shell
|
||||
#server1
|
||||
echo "1" > /usr/local/zookeeper-cluster/data/01/myid
|
||||
#server2
|
||||
echo "2" > /usr/local/zookeeper-cluster/data/02/myid
|
||||
#server3
|
||||
echo "3" > /usr/local/zookeeper-cluster/data/03/myid
|
||||
```
|
||||
|
||||
### 1.4 启动集群
|
||||
|
||||
分别启动三个节点:
|
||||
|
||||
```shell
|
||||
# 启动节点1
|
||||
/usr/app/zookeeper-cluster/zookeeper01/bin/zkServer.sh start
|
||||
# 启动节点2
|
||||
/usr/app/zookeeper-cluster/zookeeper02/bin/zkServer.sh start
|
||||
# 启动节点3
|
||||
/usr/app/zookeeper-cluster/zookeeper03/bin/zkServer.sh start
|
||||
```
|
||||
|
||||
### 1.5 集群验证
|
||||
|
||||
使用jps查看进程,并且使用`zkServer.sh status`查看集群各个节点状态。如图三个节点进程均启动成功,并且两个节点为follower节点,一个节点为leader节点。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-cluster.png"/> </div>
|
||||
|
||||
|
||||
|
||||
## 二、Kafka集群搭建
|
||||
|
||||
### 2.1 下载解压
|
||||
|
||||
Kafka安装包官方下载地址:http://kafka.apache.org/downloads ,本用例下载的版本为`2.2.0`,下载命令:
|
||||
|
||||
```shell
|
||||
# 下载
|
||||
wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
|
||||
# 解压
|
||||
tar -xzf kafka_2.12-2.2.0.tgz
|
||||
```
|
||||
|
||||
>这里j解释一下kafka安装包的命名规则:以`kafka_2.12-2.2.0.tgz`为例,前面的2.12代表Scala的版本号(Kafka采用Scala语言进行开发),后面的2.2.0则代表Kafka的版本号。
|
||||
|
||||
### 2.2 拷贝配置文件
|
||||
|
||||
进入解压目录的` config`目录下 ,拷贝三份配置文件
|
||||
|
||||
```shell
|
||||
# cp server.properties server-1.properties
|
||||
# cp server.properties server-2.properties
|
||||
# cp server.properties server-3.properties
|
||||
```
|
||||
|
||||
### 2.3 修改配置
|
||||
|
||||
分别修改三份配置文件中的部分配置,如下:
|
||||
|
||||
server-1.properties:
|
||||
|
||||
```properties
|
||||
# The id of the broker. 集群中每个节点的唯一标识
|
||||
broker.id=0
|
||||
# 监听地址
|
||||
listeners=PLAINTEXT://hadoop001:9092
|
||||
# 数据的存储位置
|
||||
log.dirs=/usr/local/kafka-logs/00
|
||||
# Zookeeper连接地址
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
server-2.properties:
|
||||
|
||||
```properties
|
||||
broker.id=1
|
||||
listeners=PLAINTEXT://hadoop001:9093
|
||||
log.dirs=/usr/local/kafka-logs/01
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
server-3.properties:
|
||||
|
||||
```properties
|
||||
broker.id=2
|
||||
listeners=PLAINTEXT://hadoop001:9094
|
||||
log.dirs=/usr/local/kafka-logs/02
|
||||
zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
|
||||
```
|
||||
|
||||
针对上面配置,需要特别说明的是`log.dirs`指的是数据日志的存储位置,确切的说,就是分区数据的存储位置,而不是程序运行日志的位置。程序运行日志的位置是通过同一目录下的`log4j.properties`进行配置的。
|
||||
|
||||
### 2.4 启动集群
|
||||
|
||||
分别指定不同配置文件,启动三个Kafka节点。启动后可以使用jps查看进程,此时应该有三个zookeeper进程和三个kafka进程。
|
||||
|
||||
```shell
|
||||
bin/kafka-server-start.sh config/server-1.properties
|
||||
bin/kafka-server-start.sh config/server-2.properties
|
||||
bin/kafka-server-start.sh config/server-3.properties
|
||||
```
|
||||
|
||||
### 2.5 创建测试主题
|
||||
|
||||
创建测试主题:
|
||||
|
||||
```shell
|
||||
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
||||
```
|
||||
|
||||
创建后可以使用以下命令查看创建的主题信息:
|
||||
|
||||
```shell
|
||||
bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
|
||||
```
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/kafka-cluster-shell.png"/> </div>
|
||||
|
||||
|
||||
|
||||
可以看到分区0的有0,1,2三个副本,且三个副本都是可用副本,都在ISR(in-sync Replica 同步副本)列表中,其中1为首领副本,此时代表集群已经搭建成功。
|
||||
|
||||
|
||||
|
||||
|
BIN
pictures/kafka-BIO.png
Normal file
BIN
pictures/kafka-BIO.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 249 KiB |
BIN
pictures/kafka-compress-message.png
Normal file
BIN
pictures/kafka-compress-message.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 81 KiB |
BIN
pictures/kafka-元数据请求.png
Normal file
BIN
pictures/kafka-元数据请求.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 55 KiB |
BIN
pictures/kafka-分区副本.png
Normal file
BIN
pictures/kafka-分区副本.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 14 KiB |
BIN
pictures/kafka-数据可见性.png
Normal file
BIN
pictures/kafka-数据可见性.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 60 KiB |
BIN
pictures/kafka-零拷贝.png
Normal file
BIN
pictures/kafka-零拷贝.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 202 KiB |
Loading…
x
Reference in New Issue
Block a user