From 774d2bf032c65e42fc100ab002fc094d3f4f844c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Mon, 3 Jun 2019 17:24:38 +0800 Subject: [PATCH] =?UTF-8?q?Kafka=E7=94=9F=E4=BA=A7=E8=80=85=E8=AF=A6?= =?UTF-8?q?=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- notes/Kafka生产者详解.md | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/notes/Kafka生产者详解.md b/notes/Kafka生产者详解.md index 679cd51..2bb4834 100644 --- a/notes/Kafka生产者详解.md +++ b/notes/Kafka生产者详解.md @@ -13,9 +13,9 @@ ## 一、生产者发送消息的过程 -首先介绍一下Kakfa生产者发送消息的过程: +首先介绍一下Kafka生产者发送消息的过程: -+ Kakfa 会将发送消息包装为ProducerRecord对象, ProducerRecord对象包含了目标主题和要发送的内容,同时还可以指定键或分区。在发送ProducerRecord对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 ++ Kafka会将发送消息包装为ProducerRecord对象, ProducerRecord对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送ProducerRecord对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 + 接下来,数据被传给分区器。如果之前已经在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据ProducerRecord对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。 + 服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。 @@ -40,9 +40,7 @@ 创建Kafka生产者时,以下三个属性是必须指定的: + **bootstrap.servers** :指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找broker的信息。不过建议至少要提供两个broker的信息作为容错; - + **key.serializer** :指定键的序列化器; - + **value.serializer** :指定值的序列化器。 创建的示例代码如下: @@ -79,7 +77,7 @@ public class SimpleProducer { #### 1. 启动Kakfa -Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的。 +Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的: ```shell # zookeeper启动命令 @@ -126,7 +124,7 @@ bin/kafka-topics.sh --create \ ### 2.4 可能出现的问题 -在这里可能出现的一个问题是:即使你在程序中配置的服务器地址是完全正确的,但是生产者程序在启动后,却一直处于等待状态,然后抛出连接超时的异常。这通常出现在你使用默认配置启动Kafka的情况下,这时候只需要对`server.properties`中`listeners`配置进行如下修改即可: +在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动Kafka的情况下,此时需要对`server.properties`文件中的`listeners`配置进行更改: ```shell # hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址 @@ -137,7 +135,7 @@ listeners=PLAINTEXT://hadoop001:9092 ## 二、发送消息 -上面的示例程序调用了`send`方法发送消息后没有做任何操作,在这种情况下,我们是没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。 +上面的示例程序调用了`send`方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。 ### 2.1 同步发送 @@ -174,7 +172,7 @@ topic=Hello-Kafka, partition=0, offset=49 ### 2.2 异步发送 -通常情况下,我们并不关心发送成功的情况,更多关注的是失败的情况,基于这种需求,Kafka提供了异步发送和回调函数。 代码如下: +通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此Kafka提供了异步发送和回调函数。 代码如下: ```scala for (int i = 0; i < 10; i++) { @@ -200,7 +198,7 @@ for (int i = 0; i < 10; i++) { Kafka有着默认的分区机制: -+ 如果键值为 null , 则使用轮询(Round Robin)算法将消息均衡地分布到各个分区上; ++ 如果键值为 null, 则使用轮询(Round Robin)算法将消息均衡地分布到各个分区上; + 如果键值不为null,那么Kafka会使用内置的散列算法对键进行散列,然后分布到各个分区上。 某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例: @@ -307,9 +305,9 @@ score:5, partition=0, acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的: -- acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应; -- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应; -- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 +- **acks=0** : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应; +- **acks=1** : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应; +- **acks=all** :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 ### 2. buffer.memory