# spring boot 整合 rabbitmq
## 目录
一、 项目结构说明
二、关键依赖
三、公共模块(rabbitmq-common)
四、服务消费者(rabbitmq-consumer)
4.1 消息消费者配置
4.2 使用注解@RabbitListener和@RabbitHandler创建消息监听者
五、 消息生产者(rabbitmq-producer)
5.1 消息生产者配置
5.2 创建消息生产者
5.3 以单元测试的方式发送消息
六、项目构建的说明
## 正文
## 一、 项目结构说明
1.1 之前关于spring 整合 rabbitmq 我们采用的是单项目的方式,为了使得用例更具有实际意义,这里采用maven多模块的构建方式,在spring-boot-rabbitmq下构建三个子模块:
1. rabbitmq-common 是公共模块,用于存放公共的接口、配置和bean,被rabbitmq-producer和rabbitmq-consumer在pom.xml中引用;
2. rabbitmq-producer 是消息的生产者模块;
3. rabbitmq-consumer是消息的消费者模块。
1.2 关于rabbitmq安装、交换机、队列、死信队列等基本概念可以参考我的手记[《RabbitMQ实战指南》读书笔记](https://github.com/heibaiying/LearningNotes/blob/master/notes/%E4%B8%AD%E9%97%B4%E4%BB%B6/RabbitMQ/%E3%80%8ARabbitMQ%E5%AE%9E%E6%88%98%E6%8C%87%E5%8D%97%E3%80%8B%E8%AF%BB%E4%B9%A6%E7%AC%94%E8%AE%B0.md),里面有详细的配图说明。
## 二、关键依赖
在父工程的项目中统一导入依赖rabbitmq的starter(spring-boot-starter-amqp),父工程的pom.xml如下
```xml
4.0.0
pom
rabbitmq-consumer
rabbitmq-producer
rabbitmq-common
org.springframework.boot
spring-boot-starter-parent
2.1.1.RELEASE
com.heibaiying
spring-boot-rabbitmq
0.0.1-SNAPSHOT
spring-boot-rabbitmq
RabbitMQ project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
```
## 三、公共模块(rabbitmq-common)
- bean 下为公共的实体类。
- constant 下为公共配置,用静态常量引用。(这里我使用静态常量是为了方便引用,实际中也可以按照情况,抽取为公共配置文件)
```java
package com.heibaiying.constant;
/**
* @author : heibaiying
* @description : rabbit 公用配置信息
*/
public class RabbitInfo {
// queue 配置
public static final String QUEUE_NAME = "spring.boot.simple.queue";
public static final String QUEUE_DURABLE = "true";
// exchange 配置
public static final String EXCHANGE_NAME = "spring.boot.simple.exchange";
public static final String EXCHANGE_TYPE = "topic";
// routing key
public static final String ROUTING_KEY = "springboot.simple.*";
}
```
## 四、服务消费者(rabbitmq-consumer)
#### 4.1 消息消费者配置
```yaml
spring:
rabbitmq:
addresses: 127.0.0.1:5672
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
username: guest
password: guest
virtual-host: /
listener:
simple:
# 为了保证信息能够被正确消费,建议签收模式设置为手工签收,并在代码中实现手工签收
acknowledge-mode: manual
# 侦听器调用者线程的最小数量
concurrency: 10
# 侦听器调用者线程的最大数量
max-concurrency: 50
```
#### 4.2 使用注解@RabbitListener和@RabbitHandler创建消息监听者
1. 使用注解创建的交换机、队列、和绑定关系会在项目初始化的时候自动创建,但是不会重复创建;
2. 这里我们创建两个消息监听器,分别演示消息是基本类型和消息是对象时的配置区别。
```java
/**
* @author : heibaiying
* @description : 消息是对象的消费者
*/
@Component
@Slf4j
public class RabbitmqBeanConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitBeanInfo.QUEUE_NAME, durable = RabbitBeanInfo.QUEUE_DURABLE),
exchange = @Exchange(value = RabbitBeanInfo.EXCHANGE_NAME, type = RabbitBeanInfo.EXCHANGE_TYPE),
key = RabbitBeanInfo.ROUTING_KEY)
)
@RabbitHandler
public void onMessage(@Payload Programmer programmer, @Headers Map headers, Channel channel) throws Exception {
log.info("programmer:{} ", programmer);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
```
```java
@Component
@Slf4j
public class RabbitmqConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),
exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),
key = RabbitInfo.ROUTING_KEY)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
MessageHeaders headers = message.getHeaders();
// 获取消息头信息和消息体
log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload());
// DELIVERY_TAG 代表 RabbitMQ 向该Channel投递的这条消息的唯一标识ID,是一个单调递增的正整数
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 DELIVERY_TAG 小于等于传入值的所有消息
channel.basicAck(deliveryTag, false);
}
}
```
## 五、 消息生产者(rabbitmq-producer)
#### 5.1 消息生产者配置
```yaml
spring:
rabbitmq:
addresses: 127.0.0.1:5672
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
username: guest
password: guest
virtual-host: /
# 是否启用发布者确认 具体确认回调实现见代码
publisher-confirms: true
# 是否启用发布者返回 具体返回回调实现见代码
publisher-returns: true
# 是否启用强制消息 保证消息的有效监听
template.mandatory: true
server:
port: 8090
```
#### 5.2 创建消息生产者
```java
/**
* @author : heibaiying
* @description : 消息生产者
*/
@Component
@Slf4j
public class RabbitmqProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendSimpleMessage(Map headers, Object message,
String messageId, String exchangeName, String key) {
// 自定义消息头
MessageHeaders messageHeaders = new MessageHeaders(headers);
// 创建消息
Message