From df2648e3fd68d2fe2adf950a94113e94f54233f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Fri, 4 Jan 2019 18:00:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20spring=20boot=20kafka=20?= =?UTF-8?q?=E6=95=B4=E5=90=88=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring-boot/spring-boot-kafka/README.md | 7 +++ spring-boot/spring-boot-kafka/pom.xml | 62 +++++++++++++++++++ .../Producer/KafKaCustomrProducer.java | 45 ++++++++++++++ .../SpringBootKafkaApplication.java | 14 +++++ .../springboot/bean/Programmer.java | 24 +++++++ .../heibaiying/springboot/constant/Topic.java | 10 +++ .../consumer/KafkaBeanConsumer.java | 24 +++++++ .../consumer/KafkaGroupConsumer.java | 37 +++++++++++ .../consumer/KafkaSimpleConsumer.java | 25 ++++++++ .../src/main/resources/application.yml | 36 +++++++++++ .../com/heibaiying/springboot/KafkaTests.java | 51 +++++++++++++++ 11 files changed, 335 insertions(+) create mode 100644 spring-boot/spring-boot-kafka/README.md create mode 100644 spring-boot/spring-boot-kafka/pom.xml create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/Producer/KafKaCustomrProducer.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/SpringBootKafkaApplication.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/bean/Programmer.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/constant/Topic.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaBeanConsumer.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaGroupConsumer.java create mode 100644 spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaSimpleConsumer.java create mode 100644 spring-boot/spring-boot-kafka/src/main/resources/application.yml create mode 100644 spring-boot/spring-boot-kafka/src/test/java/com/heibaiying/springboot/KafkaTests.java diff --git a/spring-boot/spring-boot-kafka/README.md b/spring-boot/spring-boot-kafka/README.md new file mode 100644 index 0000000..2f5a8b4 --- /dev/null +++ b/spring-boot/spring-boot-kafka/README.md @@ -0,0 +1,7 @@ +java.time.Duration类下: +private static final Pattern PATTERN = + Pattern.compile("([-+]?)P(?:([-+]?[0-9]+)D)?" + + "(T(?:([-+]?[0-9]+)H)?(?:([-+]?[0-9]+)M)?(?:([-+]?[0-9]+)(?:[.,]([0-9]{0,9}))?S)?)?", + Pattern.CASE_INSENSITIVE); + +1s 1.5s 0s 0.001S 1h 2d \ No newline at end of file diff --git a/spring-boot/spring-boot-kafka/pom.xml b/spring-boot/spring-boot-kafka/pom.xml new file mode 100644 index 0000000..c121303 --- /dev/null +++ b/spring-boot/spring-boot-kafka/pom.xml @@ -0,0 +1,62 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.1.RELEASE + + + com.heibaiying + spring-boot-kafka + 0.0.1-SNAPSHOT + spring-boot-kafka + kakfa project for Spring Boot + + + 1.8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + com.alibaba + fastjson + 1.2.54 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/Producer/KafKaCustomrProducer.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/Producer/KafKaCustomrProducer.java new file mode 100644 index 0000000..9c9cd07 --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/Producer/KafKaCustomrProducer.java @@ -0,0 +1,45 @@ +package com.heibaiying.springboot.Producer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * @author : heibaiying + * @description : 生产者 + */ + +@Component +@Slf4j +public class KafKaCustomrProducer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, Object object) { + + /* + * 这里的ListenableFuture类是spring对java原生Future的扩展增强,是一个泛型接口,用于监听异步方法的回调 + * 而对于kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult,而这里K,V的泛型实际上 + * 被用于ProducerRecord producerRecord,即生产者发送消息的key,value 类型 + */ + ListenableFuture> future = kafkaTemplate.send(topic, object); + future.addCallback(new ListenableFutureCallback>() { + @Override + public void onFailure(Throwable throwable) { + log.info("发送消息失败:" + throwable.getMessage()); + } + + @Override + public void onSuccess(SendResult sendResult) { + System.out.println("发送结果:" + sendResult.toString()); + } + }); + } +} + + diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/SpringBootKafkaApplication.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/SpringBootKafkaApplication.java new file mode 100644 index 0000000..f6f7309 --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/SpringBootKafkaApplication.java @@ -0,0 +1,14 @@ +package com.heibaiying.springboot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringBootKafkaApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBootKafkaApplication.class, args); + } + +} + diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/bean/Programmer.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/bean/Programmer.java new file mode 100644 index 0000000..c0889aa --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/bean/Programmer.java @@ -0,0 +1,24 @@ +package com.heibaiying.springboot.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.Date; + +@Data +@AllArgsConstructor +@NoArgsConstructor +// 需要实现序列化接口 +public class Programmer implements Serializable { + + private String name; + + private int age; + + private float salary; + + private Date birthday; +} + diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/constant/Topic.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/constant/Topic.java new file mode 100644 index 0000000..8dd0e03 --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/constant/Topic.java @@ -0,0 +1,10 @@ +package com.heibaiying.springboot.constant; + +// 实际开发时建议写到配置文件中 +public class Topic { + + public static final String GROUP="spring.boot.kafka.group"; + public static final String SIMPLE="spring.boot.kafka.simple"; + public static final String BEAN="spring.boot.kafka.bean"; + +} diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaBeanConsumer.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaBeanConsumer.java new file mode 100644 index 0000000..540c30d --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaBeanConsumer.java @@ -0,0 +1,24 @@ +package com.heibaiying.springboot.consumer; + +import com.alibaba.fastjson.JSON; +import com.heibaiying.springboot.bean.Programmer; +import com.heibaiying.springboot.constant.Topic; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * @author : heibaiying + * @description : kafka 消费者 + */ + +@Component +@Slf4j +public class KafkaBeanConsumer { + + @KafkaListener(groupId = "group", topics = Topic.BEAN) + public void consumer(ConsumerRecord record) { + System.out.println("消费者收到消息:" + JSON.parseObject(record.value().toString(), Programmer.class)); + } +} diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaGroupConsumer.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaGroupConsumer.java new file mode 100644 index 0000000..7019486 --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaGroupConsumer.java @@ -0,0 +1,37 @@ +package com.heibaiying.springboot.consumer; + +import com.heibaiying.springboot.constant.Topic; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * @author : heibaiying + * @description : kafka 消费者组 + * + * 多个消费者群组可以共同读取同一个主题,彼此之间互不影响。 + * 但主题的一个分区只能被同一个消费者群组里面的一个消费者读取。 + */ +@Component +@Slf4j +public class KafkaGroupConsumer { + + // 分组1 中的消费者1 + @KafkaListener(id = "consumer1-1", groupId = "group1", topics = Topic.GROUP) + public void consumer1_1(ConsumerRecord record) { + System.out.println("consumer1-1 收到消息:" + record.value()); + } + + // 分组1 中的消费者2 + @KafkaListener(id = "consumer1-2", groupId = "group1", topics = Topic.GROUP) + public void consumer1_2(ConsumerRecord record) { + System.out.println("consumer1-2 收到消息:" + record.value()); + } + + // 分组2 中的消费者 + @KafkaListener(id = "consumer2-1", groupId = "group2", topics = Topic.GROUP) + public void consumer2_1(ConsumerRecord record) { + System.out.println("consumer2-1 收到消息:" + record.value()); + } +} diff --git a/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaSimpleConsumer.java b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaSimpleConsumer.java new file mode 100644 index 0000000..55d722c --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/java/com/heibaiying/springboot/consumer/KafkaSimpleConsumer.java @@ -0,0 +1,25 @@ +package com.heibaiying.springboot.consumer; + +import com.heibaiying.springboot.constant.Topic; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +/** + * @author : heibaiying + * @description : kafka 简单消息消费者 + */ + +@Component +@Slf4j +public class KafkaSimpleConsumer { + + // 简单消费者 + @KafkaListener(groupId = "group",topics = Topic.SIMPLE) + public void consumer1_1(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + System.out.println("消费者收到消息:" + record.value() + "; topic:" + topic); + } +} diff --git a/spring-boot/spring-boot-kafka/src/main/resources/application.yml b/spring-boot/spring-boot-kafka/src/main/resources/application.yml new file mode 100644 index 0000000..f358f03 --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/main/resources/application.yml @@ -0,0 +1,36 @@ +spring: + kafka: + # ԶŷָĵַбڽKafkaȺijʼ(kafka ĬϵĶ˿ںΪ9092) + bootstrap-servers: 127.0.0.1:9092 + producer: + # ϢطĴ + retries: 0 + #жϢҪ͵ͬһʱ߻ǷͬһòָһοʹõڴСֽ㡣 + batch-size: 16384 + # ڴ滺ĴС + buffer-memory: 33554432 + # лʽ + key-serializer: org.apache.kafka.common.serialization.StringSerializer + # ֵлʽ + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # acks=0 ڳɹдϢ֮ǰȴκԷӦ + # acks=1 ֻҪȺڵյϢ߾ͻյһԷɹӦ + # acks=all ֻев븴ƵĽڵȫյϢʱ߲ŻյһԷijɹӦ + acks: all + consumer: + # Զύʱ spring boot 2.X 汾õֵΪDuration Ҫضĸʽ1S,1M,2H,5D + auto-commit-interval: 1S + # ָڶȡһûƫķƫЧ¸δ + # latestĬֵƫЧ£߽µļ¼ʼȡݣ֮ɵļ¼ + # earliest ƫЧ£߽ʼλöȡļ¼ + auto-offset-reset: earliest + # ǷԶύƫĬֵtrue,Ϊ˱ظݺݶʧ԰Ϊfalse,Ȼֶύƫ + enable-auto-commit: true + # ķлʽ + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + # ֵķлʽ + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + # е߳ + concurrency: 5 + diff --git a/spring-boot/spring-boot-kafka/src/test/java/com/heibaiying/springboot/KafkaTests.java b/spring-boot/spring-boot-kafka/src/test/java/com/heibaiying/springboot/KafkaTests.java new file mode 100644 index 0000000..82f8bcc --- /dev/null +++ b/spring-boot/spring-boot-kafka/src/test/java/com/heibaiying/springboot/KafkaTests.java @@ -0,0 +1,51 @@ +package com.heibaiying.springboot; + +import com.alibaba.fastjson.JSON; +import com.heibaiying.springboot.Producer.KafKaCustomrProducer; +import com.heibaiying.springboot.bean.Programmer; +import com.heibaiying.springboot.constant.Topic; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Date; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class KafkaTests { + + @Autowired + private KafKaCustomrProducer producer; + + /*** + * 发送消息体为基本类型的消息 + */ + @Test + public void sendSimple() { + producer.sendMessage(Topic.SIMPLE, "hello spring boot kafka"); + } + + /*** + * 发送消息体为bean的消息 + */ + @Test + public void sendBean() { + Programmer programmer = new Programmer("xiaoming", 12, 21212.33f, new Date()); + producer.sendMessage(Topic.BEAN, JSON.toJSON(programmer).toString()); + } + + + /*** + * 多消费者组、组中多消费者对同一主题的消费情况 + */ + @Test + public void sendGroup() { + for (int i = 0; i < 5; i++) { + producer.sendMessage(Topic.GROUP, "hello group " + i); + } + } +} +