Kafka生产者详解
This commit is contained in:
parent
d510252c2c
commit
774d2bf032
@ -13,9 +13,9 @@
|
||||
|
||||
## 一、生产者发送消息的过程
|
||||
|
||||
首先介绍一下Kakfa生产者发送消息的过程:
|
||||
首先介绍一下Kafka生产者发送消息的过程:
|
||||
|
||||
+ Kakfa 会将发送消息包装为ProducerRecord对象, ProducerRecord对象包含了目标主题和要发送的内容,同时还可以指定键或分区。在发送ProducerRecord对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
|
||||
+ Kafka会将发送消息包装为ProducerRecord对象, ProducerRecord对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送ProducerRecord对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
|
||||
+ 接下来,数据被传给分区器。如果之前已经在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据ProducerRecord对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。
|
||||
+ 服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。
|
||||
|
||||
@ -40,9 +40,7 @@
|
||||
创建Kafka生产者时,以下三个属性是必须指定的:
|
||||
|
||||
+ **bootstrap.servers** :指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找broker的信息。不过建议至少要提供两个broker的信息作为容错;
|
||||
|
||||
+ **key.serializer** :指定键的序列化器;
|
||||
|
||||
+ **value.serializer** :指定值的序列化器。
|
||||
|
||||
创建的示例代码如下:
|
||||
@ -79,7 +77,7 @@ public class SimpleProducer {
|
||||
|
||||
#### 1. 启动Kakfa
|
||||
|
||||
Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的。
|
||||
Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的:
|
||||
|
||||
```shell
|
||||
# zookeeper启动命令
|
||||
@ -126,7 +124,7 @@ bin/kafka-topics.sh --create \
|
||||
|
||||
### 2.4 可能出现的问题
|
||||
|
||||
在这里可能出现的一个问题是:即使你在程序中配置的服务器地址是完全正确的,但是生产者程序在启动后,却一直处于等待状态,然后抛出连接超时的异常。这通常出现在你使用默认配置启动Kafka的情况下,这时候只需要对`server.properties`中`listeners`配置进行如下修改即可:
|
||||
在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动Kafka的情况下,此时需要对`server.properties`文件中的`listeners`配置进行更改:
|
||||
|
||||
```shell
|
||||
# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
|
||||
@ -137,7 +135,7 @@ listeners=PLAINTEXT://hadoop001:9092
|
||||
|
||||
## 二、发送消息
|
||||
|
||||
上面的示例程序调用了`send`方法发送消息后没有做任何操作,在这种情况下,我们是没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。
|
||||
上面的示例程序调用了`send`方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。
|
||||
|
||||
### 2.1 同步发送
|
||||
|
||||
@ -174,7 +172,7 @@ topic=Hello-Kafka, partition=0, offset=49
|
||||
|
||||
### 2.2 异步发送
|
||||
|
||||
通常情况下,我们并不关心发送成功的情况,更多关注的是失败的情况,基于这种需求,Kafka提供了异步发送和回调函数。 代码如下:
|
||||
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此Kafka提供了异步发送和回调函数。 代码如下:
|
||||
|
||||
```scala
|
||||
for (int i = 0; i < 10; i++) {
|
||||
@ -200,7 +198,7 @@ for (int i = 0; i < 10; i++) {
|
||||
|
||||
Kafka有着默认的分区机制:
|
||||
|
||||
+ 如果键值为 null , 则使用轮询(Round Robin)算法将消息均衡地分布到各个分区上;
|
||||
+ 如果键值为 null, 则使用轮询(Round Robin)算法将消息均衡地分布到各个分区上;
|
||||
+ 如果键值不为null,那么Kafka会使用内置的散列算法对键进行散列,然后分布到各个分区上。
|
||||
|
||||
某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:
|
||||
@ -307,9 +305,9 @@ score:5, partition=0,
|
||||
|
||||
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
|
||||
|
||||
- acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
|
||||
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
|
||||
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
- **acks=0** : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
|
||||
- **acks=1** : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
|
||||
- **acks=all** :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
|
||||
|
||||
### 2. buffer.memory
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user