From dbd515b8624443da86a3b3b9b5248bf812ea8c90 Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Tue, 1 Jan 2019 14:57:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20spring=20boot=20rabbitMQ?= =?UTF-8?q?=20=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring-boot/spring-boot-rabbitmq/pom.xml | 52 +++++++++++++++++++ .../rabbitmq-common/pom.xml | 14 +++++ .../java/com/heibaiying/bean/Programmer.java | 26 ++++++++++ .../heibaiying/constant/RabbitBeanInfo.java | 19 +++++++ .../com/heibaiying/constant/RabbitInfo.java | 19 +++++++ .../rabbitmq-consumer/pom.xml | 39 ++++++++++++++ .../RabbitmqConsumerApplication.java | 14 +++++ .../consumer/RabbitmqBeanConsumer.java | 36 +++++++++++++ .../consumer/RabbitmqConsumer.java | 38 ++++++++++++++ .../src/main/resources/application.yml | 17 ++++++ .../RabbitmqConsumerApplicationTests.java | 17 ++++++ .../rabbitmq-producer/pom.xml | 39 ++++++++++++++ .../rabbitmq/RabbitmqProducerApplication.java | 14 +++++ .../rabbitmq/producer/RabbitmqProducer.java | 44 ++++++++++++++++ .../src/main/resources/application.yml | 18 +++++++ .../rabbitmq/RabbitmqProducerTests.java | 42 +++++++++++++++ 16 files changed, 448 insertions(+) create mode 100644 spring-boot/spring-boot-rabbitmq/pom.xml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/bean/Programmer.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitBeanInfo.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitInfo.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplication.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqBeanConsumer.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/test/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplicationTests.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/RabbitmqProducerApplication.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml create mode 100644 spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java diff --git a/spring-boot/spring-boot-rabbitmq/pom.xml b/spring-boot/spring-boot-rabbitmq/pom.xml new file mode 100644 index 0000000..66b8966 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/pom.xml @@ -0,0 +1,52 @@ + + + + 4.0.0 + pom + + + rabbitmq-consumer + rabbitmq-producer + rabbitmq-common + + + + org.springframework.boot + spring-boot-starter-parent + 2.1.1.RELEASE + + + com.heibaiying + spring-boot-rabbitmq + 0.0.1-SNAPSHOT + spring-boot-rabbitmq + RabbitMQ project for Spring Boot + + + 1.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-web + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml new file mode 100644 index 0000000..057f7d3 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml @@ -0,0 +1,14 @@ + + + + spring-boot-rabbitmq + com.heibaiying + 0.0.1-SNAPSHOT + + 4.0.0 + + rabbitmq-common + + \ No newline at end of file diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/bean/Programmer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/bean/Programmer.java new file mode 100644 index 0000000..3750385 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/bean/Programmer.java @@ -0,0 +1,26 @@ +package com.heibaiying.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author : heibaiying + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +// 需要实现序列化接口 +public class Programmer implements Serializable { + + private String name; + + private int age; + + private float salary; + + private Date birthday; +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitBeanInfo.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitBeanInfo.java new file mode 100644 index 0000000..9cb8bd4 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitBeanInfo.java @@ -0,0 +1,19 @@ +package com.heibaiying.constant; + +/** + * @author : heibaiying + * @description : rabbit 公用配置信息 + */ +public class RabbitBeanInfo { + + // queue 配置 + public static final String QUEUE_NAME = "spring.boot.bean.queue"; + public static final String QUEUE_DURABLE = "true"; + + // exchange 配置 + public static final String EXCHANGE_NAME = "spring.boot.bean.exchange"; + public static final String EXCHANGE_TYPE = "direct"; + + // routing key + public static final String ROUTING_KEY = "springboot.bean"; +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitInfo.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitInfo.java new file mode 100644 index 0000000..61f3e8a --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-common/src/main/java/com/heibaiying/constant/RabbitInfo.java @@ -0,0 +1,19 @@ +package com.heibaiying.constant; + +/** + * @author : heibaiying + * @description : rabbit 公用配置信息 + */ +public class RabbitInfo { + + // queue 配置 + public static final String QUEUE_NAME = "spring.boot.simple.queue"; + public static final String QUEUE_DURABLE = "true"; + + // exchange 配置 + public static final String EXCHANGE_NAME = "spring.boot.simple.exchange"; + public static final String EXCHANGE_TYPE = "topic"; + + // routing key + public static final String ROUTING_KEY = "springboot.simple.*"; +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml new file mode 100644 index 0000000..56cbcea --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + com.heibaiying + spring-boot-rabbitmq + 0.0.1-SNAPSHOT + + + rabbitmq-consumer + 0.0.1-SNAPSHOT + rabbitmq-consumer + RabbitMQ consumer project for Spring Boot + + + 1.8 + + + + + com.heibaiying + rabbitmq-common + 0.0.1-SNAPSHOT + compile + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplication.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplication.java new file mode 100644 index 0000000..657ae5d --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplication.java @@ -0,0 +1,14 @@ +package com.heibaiying.rabbitmqconsumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class RabbitmqConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(RabbitmqConsumerApplication.class, args); + } + +} + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqBeanConsumer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqBeanConsumer.java new file mode 100644 index 0000000..847b5a2 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqBeanConsumer.java @@ -0,0 +1,36 @@ +package com.heibaiying.rabbitmqconsumer.consumer; + + +import com.heibaiying.bean.Programmer; +import com.heibaiying.constant.RabbitBeanInfo; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author : heibaiying + * @description : 消息是对象的消费者 + */ + +@Component +@Slf4j +public class RabbitmqBeanConsumer { + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitBeanInfo.QUEUE_NAME, durable = RabbitBeanInfo.QUEUE_DURABLE), + exchange = @Exchange(value = RabbitBeanInfo.EXCHANGE_NAME, type = RabbitBeanInfo.EXCHANGE_TYPE), + key = RabbitBeanInfo.ROUTING_KEY) + ) + @RabbitHandler + public void onMessage(@Payload Programmer programmer, @Headers Map headers, Channel channel) throws Exception { + log.info("programmer:{} ", programmer); + Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); + channel.basicAck(deliveryTag, false); + } +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java new file mode 100644 index 0000000..184094b --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java @@ -0,0 +1,38 @@ +package com.heibaiying.rabbitmqconsumer.consumer; + + +import com.heibaiying.constant.RabbitInfo; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; + +/** + * @author : heibaiying + * @description : 消息消费者 + */ + +@Component +@Slf4j +public class RabbitmqConsumer { + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE), + exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE), + key = RabbitInfo.ROUTING_KEY) + ) + @RabbitHandler + public void onMessage(Message message, Channel channel) throws Exception { + MessageHeaders headers = message.getHeaders(); + // 获取消息头信息和消息体 + log.info("msgInfo:{} ; payload:{} ",headers.get("msgInfo"),message.getPayload()); + // DELIVERY_TAG 是一个自增的数值 + Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); + // 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 + channel.basicAck(deliveryTag, false); + } + +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml new file mode 100644 index 0000000..6bab973 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml @@ -0,0 +1,17 @@ +spring: + rabbitmq: + addresses: 192.168.0.108:5672 + # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" + # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 + username: admin + password: admin + virtual-host: my_vhost + listener: + simple: + # 签收模式设置为手工签收 + acknowledge-mode: manual + # 侦听器调用者线程的最小数量 + concurrency: 10 + # 侦听器调用者线程的最大数量 + max-concurrency: 50 + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/test/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplicationTests.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/test/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplicationTests.java new file mode 100644 index 0000000..d0ec1fc --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/test/java/com/heibaiying/rabbitmqconsumer/RabbitmqConsumerApplicationTests.java @@ -0,0 +1,17 @@ +package com.heibaiying.rabbitmqconsumer; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class RabbitmqConsumerApplicationTests { + + @Test + public void contextLoads() { + } + +} + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml new file mode 100644 index 0000000..1807dc4 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + com.heibaiying + spring-boot-rabbitmq + 0.0.1-SNAPSHOT + + + rabbitmq-producer + 0.0.1-SNAPSHOT + rabbitmq-producer + RabbitMQ producer project for Spring Boot + + + 1.8 + + + + + com.heibaiying + rabbitmq-common + 0.0.1-SNAPSHOT + compile + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/RabbitmqProducerApplication.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/RabbitmqProducerApplication.java new file mode 100644 index 0000000..5c463cb --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/RabbitmqProducerApplication.java @@ -0,0 +1,14 @@ +package com.heibaiying.rabbitmq; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class RabbitmqProducerApplication { + + public static void main(String[] args) { + SpringApplication.run(RabbitmqProducerApplication.class, args); + } + +} + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java new file mode 100644 index 0000000..7d4b3e4 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java @@ -0,0 +1,44 @@ +package com.heibaiying.rabbitmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author : heibaiying + * @description : 消息生产者 + */ +@Component +@Slf4j +public class RabbitmqProducer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + public void sendSimpleMessage( Map headers,Object message, + String messageId, String exchangeName, String key) { + // 自定义消息头 + MessageHeaders messageHeaders = new MessageHeaders(headers); + // 创建消息 + Message msg = MessageBuilder.createMessage(message, messageHeaders); + // 确认的回调 确认消息是否到达 Broker 服务器 + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + //当ack为false时才表示消息不可达,才需要进行对应的消息补偿机制 + log.info("correlationData:{} , ack:{}", correlationData.getId(),ack); + }); + // 消息失败的回调 + rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> { + log.info("message:{}; replyCode{}; replyText{} ;", message1.getBody(), replyCode, replyText); + }); + // 在实际中ID 应该是全局唯一 能够唯一标识消息 + CorrelationData correlationData = new CorrelationData(messageId); + rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData); + } +} diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml new file mode 100644 index 0000000..36aa8ec --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml @@ -0,0 +1,18 @@ +spring: + rabbitmq: + addresses: 192.168.0.108:5672 + # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" + # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 + username: admin + password: admin + virtual-host: my_vhost + # 是否启用发布者确认 具体确认回调实现见代码 + publisher-confirms: true + # 是否启用发布者返回 具体返回回调实现见代码 + publisher-returns: true + # 是否启用强制消息 保证消息的有效监听 + template.mandatory: true + +server: + port: 8090 + diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java new file mode 100644 index 0000000..d787c74 --- /dev/null +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java @@ -0,0 +1,42 @@ +package com.heibaiying.rabbitmq; + +import com.heibaiying.bean.Programmer; +import com.heibaiying.constant.RabbitBeanInfo; +import com.heibaiying.constant.RabbitInfo; +import com.heibaiying.rabbitmq.producer.RabbitmqProducer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class RabbitmqProducerTests { + + @Autowired + private RabbitmqProducer producer; + + @Test + public void send() { + Map heads = new HashMap<>(); + heads.put("msgInfo", "自定义消息头信息"); + // 模拟生成消息ID,在实际中应该是全局唯一的 消息不可达时候可以在setConfirmCallback回调中取得,可以进行对应的重发或错误处理 + String id = String.valueOf(Math.round(Math.random() * 10000)); + producer.sendSimpleMessage(heads, "hello Spring", id, RabbitInfo.EXCHANGE_NAME, "springboot.simple.abc"); + } + + + @Test + public void sendBean() { + String id = String.valueOf(Math.round(Math.random() * 10000)); + Programmer programmer = new Programmer("xiaoMing", 12, 12123.45f, new Date()); + producer.sendSimpleMessage(null, programmer, id, RabbitBeanInfo.EXCHANGE_NAME, RabbitBeanInfo.ROUTING_KEY); + } + +} +