From 72080ba384c03e83972fdd47516f5a90c40252a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Mon, 3 Jun 2019 17:43:06 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=B6=88=E8=B4=B9=E8=80=85=E8=AF=A6?= =?UTF-8?q?=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- notes/Kafka消费者详解.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/notes/Kafka消费者详解.md b/notes/Kafka消费者详解.md index 4faea8c..c075591 100644 --- a/notes/Kafka消费者详解.md +++ b/notes/Kafka消费者详解.md @@ -19,7 +19,7 @@ ## 一、消费者和消费者群组 -在Kafka中,消费者通常是消费者群组的一部分,多个消费者群组中消费者共同读取同一个主题时,彼此之间互不影响。这是因为Kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS ,或者进行耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是横向伸缩的主要手段。 +在Kafka中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka之所以要引入消费者群组这个概念是因为Kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是Kafka实现横向伸缩的主要手段。
@@ -49,7 +49,7 @@ + **consumer.subscribe(Collection\ topics)** :指明需要订阅的主题的集合; + **consumer.subscribe(Pattern pattern)** :使用正则来匹配需要订阅的集合。 -最后只需要通过轮询API(`poll`)向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理即可。 示例代码如下: +最后只需要通过轮询API(`poll`)向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理。 示例如下: ```scala String topic = "Hello-Kafka"; @@ -95,7 +95,7 @@ Kafka的每一条消息都有一个偏移量属性,记录了其在分区中的 Kafka支持自动提交和手动提交偏移量两种方式。这里先介绍比较简单的自动提交: -只需要将消费者的`enable.auto.commit`属性配置为true即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把`poll()`方法接收到的最大偏移量进行提交,提交间隔由`auto.commit.interval.ms`属性进行配置,默认值是5s。 +只需要将消费者的`enable.auto.commit`属性配置为`true`即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把`poll()`方法接收到的最大偏移量进行提交,提交间隔由`auto.commit.interval.ms`属性进行配置,默认值是5s。 使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka也提供了手动提交偏移量的API,使得用户可以更为灵活的提交偏移量。 @@ -151,9 +151,9 @@ while (true) { } ``` -异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了200和300的偏移量,此时200的偏移量失败的,但是紧随其后的300的偏移量成功了,此时如果重试就会存在200覆盖300偏移量的可能。同步提交就不存在这个问题,因为在同步提交的情况下,300的提交请求必须等待服务器返回200提交请求的成功反馈时候才会发出。基于这个原因,某些情况下,需要同时组合同步和异步两种提交方式。 +异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了200和300的偏移量,此时200的偏移量失败的,但是紧随其后的300的偏移量成功了,此时如果重试就会存在200覆盖300偏移量的可能。同步提交就不存在这个问题,因为在同步提交的情况下,300的提交请求必须等待服务器返回200提交请求的成功反馈后才会发出。基于这个原因,某些情况下,需要同时组合同步和异步两种提交方式。 -> 注:虽然程序不在再失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个 Map offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。 +> 注:虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个 Map offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。 ### 4.3 同步加异步提交 @@ -183,7 +183,7 @@ try { ### 4.4 提交特定偏移量 -在上面同步和异步提交的API中,实际上我们都没有对commit方法传递参数,此时默认提交的是当前轮训的最大偏移量,如果你需要提交特定的偏移量,可以调用它们的重载方法。 +在上面同步和异步提交的API中,实际上我们都没有对commit方法传递参数,此时默认提交的是当前轮询的最大偏移量,如果你需要提交特定的偏移量,可以调用它们的重载方法。 ```java /*同步提交特定偏移量*/ @@ -218,7 +218,7 @@ try { ## 五、监听分区再均衡 -因为分区再均衡会导致分区与消费者的重新划分,有时候你可能希望在再均衡前执行一些操作:比如提交已经处理但是尚未的偏移量,关闭数据库连接等。此时可以在订阅主题时候,调用`subscribe`的重载方法传入自定义的分区再均衡监听器。 +因为分区再均衡会导致分区与消费者的重新划分,有时候你可能希望在再均衡前执行一些操作:比如提交已经处理但是尚未提交的偏移量,关闭数据库连接等。此时可以在订阅主题时候,调用`subscribe`的重载方法传入自定义的分区再均衡监听器。 ```java /*订阅指定集合内的所有主题*/ @@ -269,7 +269,7 @@ try { ## 六 、退出轮询 -Kafka提供了`consumer.wakeup()`方法用于退出轮询,它通过抛出`WakeupException`异常来跳出循环。需要注意的是,在退出线程最好显示的调用`consumer.close()` , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。 +Kafka提供了`consumer.wakeup()`方法用于退出轮询,它通过抛出`WakeupException`异常来跳出循环。需要注意的是,在退出线程时最好显示的调用`consumer.close()` , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。 下面的示例代码为监听控制台输出,当输入`exit`时结束轮询,关闭消费者并退出程序: