record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
- System.out.println("消费者收到消息:" + record.value() + "; topic:" + topic);
- /*
- * 如果需要手工提交异步 consumer.commitSync();
- * 手工同步提交 consumer.commitAsync()
- */
- }
-}
-```
-
-#### 2.4 测试整合结果
-
-```java
-@Slf4j
-@RestController
-public class SendMsgController {
-
- @Autowired
- private KafKaCustomrProducer producer;
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- /***
- * 发送消息体为基本类型的消息
- */
- @GetMapping("sendSimple")
- public void sendSimple() {
- producer.sendMessage(Topic.SIMPLE, "hello spring boot kafka");
- }
-}
-```
-
-
-
-## 三、关于多消费者组的测试
-
-#### 3.1 创建多分区主题
-
-```java
-/**
- * @author : heibaiying
- * @description : kafka 配置类
- */
-@Configuration
-public class KafkaConfig {
-
- @Bean
- public NewTopic groupTopic() {
- // 指定主题名称,分区数量,和复制因子
- return new NewTopic(Topic.GROUP, 10, (short) 2);
- }
-
-}
-```
-
-#### 3.2 多消费者组对同一主题的监听
-
-1. 消费者 1-1 监听主题的 0、1 分区
-2. 消费者 1-2 监听主题的 2、3 分区
-3. 消费者 1-3 监听主题的 0、1 分区
-4. 消费者 2-1 监听主题的所有分区
-
-```java
-/**
- * @author : heibaiying
- * @description : kafka 消费者组
- *
- * 多个消费者群组可以共同读取同一个主题,彼此之间互不影响。
- */
-@Component
-@Slf4j
-public class KafkaGroupConsumer {
-
- // 分组 1 中的消费者 1
- @KafkaListener(id = "consumer1-1", groupId = "group1", topicPartitions =
- {@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
- })
- public void consumer1_1(ConsumerRecord record) {
- System.out.println("consumer1-1 收到消息:" + record.value());
- }
-
- // 分组 1 中的消费者 2
- @KafkaListener(id = "consumer1-2", groupId = "group1", topicPartitions =
- {@TopicPartition(topic = Topic.GROUP, partitions = {"2", "3"})
- })
- public void consumer1_2(ConsumerRecord record) {
- System.out.println("consumer1-2 收到消息:" + record.value());
- }
-
- // 分组 1 中的消费者 3
- @KafkaListener(id = "consumer1-3", groupId = "group1", topicPartitions =
- {@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
- })
- public void consumer1_3(ConsumerRecord record) {
- System.out.println("consumer1-3 收到消息:" + record.value());
- }
-
- // 分组 2 中的消费者
- @KafkaListener(id = "consumer2-1", groupId = "group2", topics = Topic.GROUP)
- public void consumer2_1(ConsumerRecord record) {
- System.err.println("consumer2-1 收到消息:" + record.value());
- }
-}
-
-```
-
-#### 3.2 发送消息时候指定主题的具体分区
-
-```java
-/***
- * 多消费者组、组中多消费者对同一主题的消费情况
- */
-@GetMapping("sendGroup")
-public void sendGroup() {
- for (int i = 0; i < 4; i++) {
- // 第二个参数指定分区,第三个参数指定消息键 分区优先
- ListenableFuture> future = kafkaTemplate.send(Topic.GROUP, i % 4, "key", "hello group " + i);
- future.addCallback(new ListenableFutureCallback>() {
- @Override
- public void onFailure(Throwable throwable) {
- log.info("发送消息失败:" + throwable.getMessage());
- }
-
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("发送结果:" + sendResult.toString());
- }
- });
- }
-}
-```
-
-测试结果:
-
-```yaml
-# 主要看每次发送结果中的 partition 属性,代表四次消息分别发送到了主题的0,1,2,3分区
-发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=1, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 1, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-1@13]
-发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 0, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-0@19]
-发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=3, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 3, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-3@13]
-发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=2, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 2, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-2@13]
-# 消费者组2 接收情况
-consumer2-1 收到消息:hello group 1
-consumer2-1 收到消息:hello group 0
-consumer2-1 收到消息:hello group 2
-consumer2-1 收到消息:hello group 3
-# 消费者1-1接收情况
-consumer1-1 收到消息:hello group 1
-consumer1-1 收到消息:hello group 0
-# 消费者1-3接收情况
-consumer1-3 收到消息:hello group 1
-consumer1-3 收到消息:hello group 0
-# 消费者1-2接收情况
-consumer1-2 收到消息:hello group 3
-consumer1-2 收到消息:hello group 2
-```
-
-#### 3.4 测试结果
-
-1. 和 kafka 原本的机制一样,多消费者组之间对于同一个主题的消费彼此之间互不影响;
-2. 和 kafka 原本机制不一样的是,这里我们消费者 1-1 和消费 1-3 共同属于同一个消费者组,并且监听同样的分区,按照原本 kafka 的机制,群组保证每个分区只能被同一个消费者组的一个消费者使用,但是按照 spring 的声明方式实现的消息监听,这里被两个消费者都监听到了。
-
-
-
-## 四、序列化与反序列化
-
-用例采用的是第三方 fastjson 将实体类序列化为 json 后发送。实现如下:
-
-```java
-/***
- * 发送消息体为 bean 的消息
- */
-@GetMapping("sendBean")
-public void sendBean() {
- Programmer programmer = new Programmer("xiaoming", 12, 21212.33f, new Date());
- producer.sendMessage(Topic.BEAN, JSON.toJSON(programmer).toString());
-}
-
-```
-
-```java
-@Component
-@Slf4j
-public class KafkaBeanConsumer {
-
- @KafkaListener(groupId = "beanGroup",topics = Topic.BEAN)
- public void consumer(ConsumerRecord record) {
- System.out.println("消费者收到消息:" + JSON.parseObject(record.value().toString(), Programmer.class));
- }
-}
-```
-
+