record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
System.out.println("消费者收到消息:" + record.value() + "; topic:" + topic);
/*
* 如果需要手工提交异步 consumer.commitSync();
* 手工同步提交 consumer.commitAsync()
*/
}
}
```
#### 2.4 测试整合结果
```java
@Slf4j
@RestController
public class SendMsgController {
@Autowired
private KafKaCustomrProducer producer;
@Autowired
private KafkaTemplate kafkaTemplate;
/***
* 发送消息体为基本类型的消息
*/
@GetMapping("sendSimple")
public void sendSimple() {
producer.sendMessage(Topic.SIMPLE, "hello spring boot kafka");
}
}
```
## 三、关于多消费者组的测试
#### 3.1 创建多分区主题
```java
/**
* @author : heibaiying
* @description : kafka配置类
*/
@Configuration
public class KafkaConfig {
@Bean
public NewTopic groupTopic() {
// 指定主题名称,分区数量,和复制因子
return new NewTopic(Topic.GROUP, 10, (short) 2);
}
}
```
#### 3.2 多消费者组对同一主题的监听
1. 消费者1-1 监听主题的 0、1 分区
2. 消费者1-2 监听主题的 2、3 分区
3. 消费者1-3 监听主题的 0、1 分区
4. 消费者2-1 监听主题的所有分区
```java
/**
* @author : heibaiying
* @description : kafka 消费者组
*
* 多个消费者群组可以共同读取同一个主题,彼此之间互不影响。
*/
@Component
@Slf4j
public class KafkaGroupConsumer {
// 分组1 中的消费者1
@KafkaListener(id = "consumer1-1", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
})
public void consumer1_1(ConsumerRecord record) {
System.out.println("consumer1-1 收到消息:" + record.value());
}
// 分组1 中的消费者2
@KafkaListener(id = "consumer1-2", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"2", "3"})
})
public void consumer1_2(ConsumerRecord record) {
System.out.println("consumer1-2 收到消息:" + record.value());
}
// 分组1 中的消费者3
@KafkaListener(id = "consumer1-3", groupId = "group1", topicPartitions =
{@TopicPartition(topic = Topic.GROUP, partitions = {"0", "1"})
})
public void consumer1_3(ConsumerRecord record) {
System.out.println("consumer1-3 收到消息:" + record.value());
}
// 分组2 中的消费者
@KafkaListener(id = "consumer2-1", groupId = "group2", topics = Topic.GROUP)
public void consumer2_1(ConsumerRecord record) {
System.err.println("consumer2-1 收到消息:" + record.value());
}
}
```
#### 3.2 发送消息时候指定主题的具体分区
```java
/***
* 多消费者组、组中多消费者对同一主题的消费情况
*/
@GetMapping("sendGroup")
public void sendGroup() {
for (int i = 0; i < 4; i++) {
// 第二个参数指定分区,第三个参数指定消息键 分区优先
ListenableFuture> future = kafkaTemplate.send(Topic.GROUP, i % 4, "key", "hello group " + i);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
log.info("发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult.toString());
}
});
}
}
```
测试结果:
```yaml
# 主要看每次发送结果中的 partition 属性,代表四次消息分别发送到了主题的0,1,2,3分区
发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=1, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 1, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-1@13]
发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 0, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-0@19]
发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=3, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 3, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-3@13]
发送结果:SendResult [producerRecord=ProducerRecord(topic=spring.boot.kafka.newGroup, partition=2, headers=RecordHeaders(headers = [], isReadOnly = true), key=key, value=hello group 2, timestamp=null), recordMetadata=spring.boot.kafka.newGroup-2@13]
# 消费者组2 接收情况
consumer2-1 收到消息:hello group 1
consumer2-1 收到消息:hello group 0
consumer2-1 收到消息:hello group 2
consumer2-1 收到消息:hello group 3
# 消费者1-1接收情况
consumer1-1 收到消息:hello group 1
consumer1-1 收到消息:hello group 0
# 消费者1-3接收情况
consumer1-3 收到消息:hello group 1
consumer1-3 收到消息:hello group 0
# 消费者1-2接收情况
consumer1-2 收到消息:hello group 3
consumer1-2 收到消息:hello group 2
```
#### 3.4 测试结果
1. 和kafka 原本的机制一样,多消费者组之间对于同一个主题的消费彼此之间互不影响;
2. 和kafka原本机制不一样的是,这里我们消费者1-1和消费1-3共同属于同一个消费者组,并且监听同样的分区,按照原本kafka的机制,群组保证每个分区只能被同一个消费者组的一个消费者使用,但是按照spring的声明方式实现的消息监听,这里被两个消费者都监听到了。
## 四、序列化与反序列化
用例采用的是第三方fastjson将实体类序列化为json后发送。实现如下:
```java
/***
* 发送消息体为bean的消息
*/
@GetMapping("sendBean")
public void sendBean() {
Programmer programmer = new Programmer("xiaoming", 12, 21212.33f, new Date());
producer.sendMessage(Topic.BEAN, JSON.toJSON(programmer).toString());
}
```
```java
@Component
@Slf4j
public class KafkaBeanConsumer {
@KafkaListener(groupId = "beanGroup",topics = Topic.BEAN)
public void consumer(ConsumerRecord record) {
System.out.println("消费者收到消息:" + JSON.parseObject(record.value().toString(), Programmer.class));
}
}
```