增加 spring boot rabbitMQ 用例
This commit is contained in:
parent
4484b8d2af
commit
dbd515b862
52
spring-boot/spring-boot-rabbitmq/pom.xml
Normal file
52
spring-boot/spring-boot-rabbitmq/pom.xml
Normal file
@ -0,0 +1,52 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>rabbitmq-consumer</module>
|
||||
<module>rabbitmq-producer</module>
|
||||
<module>rabbitmq-common</module>
|
||||
</modules>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>spring-boot-rabbitmq</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>spring-boot-rabbitmq</name>
|
||||
<description>RabbitMQ project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
14
spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml
Normal file
14
spring-boot/spring-boot-rabbitmq/rabbitmq-common/pom.xml
Normal file
@ -0,0 +1,14 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>spring-boot-rabbitmq</artifactId>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>rabbitmq-common</artifactId>
|
||||
|
||||
</project>
|
@ -0,0 +1,26 @@
|
||||
package com.heibaiying.bean;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
// 需要实现序列化接口
|
||||
public class Programmer implements Serializable {
|
||||
|
||||
private String name;
|
||||
|
||||
private int age;
|
||||
|
||||
private float salary;
|
||||
|
||||
private Date birthday;
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.heibaiying.constant;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
* @description : rabbit 公用配置信息
|
||||
*/
|
||||
public class RabbitBeanInfo {
|
||||
|
||||
// queue 配置
|
||||
public static final String QUEUE_NAME = "spring.boot.bean.queue";
|
||||
public static final String QUEUE_DURABLE = "true";
|
||||
|
||||
// exchange 配置
|
||||
public static final String EXCHANGE_NAME = "spring.boot.bean.exchange";
|
||||
public static final String EXCHANGE_TYPE = "direct";
|
||||
|
||||
// routing key
|
||||
public static final String ROUTING_KEY = "springboot.bean";
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.heibaiying.constant;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
* @description : rabbit 公用配置信息
|
||||
*/
|
||||
public class RabbitInfo {
|
||||
|
||||
// queue 配置
|
||||
public static final String QUEUE_NAME = "spring.boot.simple.queue";
|
||||
public static final String QUEUE_DURABLE = "true";
|
||||
|
||||
// exchange 配置
|
||||
public static final String EXCHANGE_NAME = "spring.boot.simple.exchange";
|
||||
public static final String EXCHANGE_TYPE = "topic";
|
||||
|
||||
// routing key
|
||||
public static final String ROUTING_KEY = "springboot.simple.*";
|
||||
}
|
39
spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml
Normal file
39
spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/pom.xml
Normal file
@ -0,0 +1,39 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>spring-boot-rabbitmq</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>rabbitmq-consumer</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>rabbitmq-consumer</name>
|
||||
<description>RabbitMQ consumer project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>rabbitmq-common</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,14 @@
|
||||
package com.heibaiying.rabbitmqconsumer;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class RabbitmqConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RabbitmqConsumerApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,36 @@
|
||||
package com.heibaiying.rabbitmqconsumer.consumer;
|
||||
|
||||
|
||||
import com.heibaiying.bean.Programmer;
|
||||
import com.heibaiying.constant.RabbitBeanInfo;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
* @description : 消息是对象的消费者
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RabbitmqBeanConsumer {
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(value = RabbitBeanInfo.QUEUE_NAME, durable = RabbitBeanInfo.QUEUE_DURABLE),
|
||||
exchange = @Exchange(value = RabbitBeanInfo.EXCHANGE_NAME, type = RabbitBeanInfo.EXCHANGE_TYPE),
|
||||
key = RabbitBeanInfo.ROUTING_KEY)
|
||||
)
|
||||
@RabbitHandler
|
||||
public void onMessage(@Payload Programmer programmer, @Headers Map<String, Object> headers, Channel channel) throws Exception {
|
||||
log.info("programmer:{} ", programmer);
|
||||
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package com.heibaiying.rabbitmqconsumer.consumer;
|
||||
|
||||
|
||||
import com.heibaiying.constant.RabbitInfo;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
* @description : 消息消费者
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RabbitmqConsumer {
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding(
|
||||
value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),
|
||||
exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),
|
||||
key = RabbitInfo.ROUTING_KEY)
|
||||
)
|
||||
@RabbitHandler
|
||||
public void onMessage(Message message, Channel channel) throws Exception {
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
// 获取消息头信息和消息体
|
||||
log.info("msgInfo:{} ; payload:{} ",headers.get("msgInfo"),message.getPayload());
|
||||
// DELIVERY_TAG 是一个自增的数值
|
||||
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
|
||||
// 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
spring:
|
||||
rabbitmq:
|
||||
addresses: 192.168.0.108:5672
|
||||
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
|
||||
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
|
||||
username: admin
|
||||
password: admin
|
||||
virtual-host: my_vhost
|
||||
listener:
|
||||
simple:
|
||||
# 签收模式设置为手工签收
|
||||
acknowledge-mode: manual
|
||||
# 侦听器调用者线程的最小数量
|
||||
concurrency: 10
|
||||
# 侦听器调用者线程的最大数量
|
||||
max-concurrency: 50
|
||||
|
@ -0,0 +1,17 @@
|
||||
package com.heibaiying.rabbitmqconsumer;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
public class RabbitmqConsumerApplicationTests {
|
||||
|
||||
@Test
|
||||
public void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
||||
|
39
spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml
Normal file
39
spring-boot/spring-boot-rabbitmq/rabbitmq-producer/pom.xml
Normal file
@ -0,0 +1,39 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>spring-boot-rabbitmq</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>rabbitmq-producer</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>rabbitmq-producer</name>
|
||||
<description>RabbitMQ producer project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>rabbitmq-common</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,14 @@
|
||||
package com.heibaiying.rabbitmq;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class RabbitmqProducerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RabbitmqProducerApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
package com.heibaiying.rabbitmq.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author : heibaiying
|
||||
* @description : 消息生产者
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RabbitmqProducer {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
public void sendSimpleMessage( Map<String, Object> headers,Object message,
|
||||
String messageId, String exchangeName, String key) {
|
||||
// 自定义消息头
|
||||
MessageHeaders messageHeaders = new MessageHeaders(headers);
|
||||
// 创建消息
|
||||
Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
|
||||
// 确认的回调 确认消息是否到达 Broker 服务器
|
||||
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
|
||||
//当ack为false时才表示消息不可达,才需要进行对应的消息补偿机制
|
||||
log.info("correlationData:{} , ack:{}", correlationData.getId(),ack);
|
||||
});
|
||||
// 消息失败的回调
|
||||
rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> {
|
||||
log.info("message:{}; replyCode{}; replyText{} ;", message1.getBody(), replyCode, replyText);
|
||||
});
|
||||
// 在实际中ID 应该是全局唯一 能够唯一标识消息
|
||||
CorrelationData correlationData = new CorrelationData(messageId);
|
||||
rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData);
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
spring:
|
||||
rabbitmq:
|
||||
addresses: 192.168.0.108:5672
|
||||
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
|
||||
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
|
||||
username: admin
|
||||
password: admin
|
||||
virtual-host: my_vhost
|
||||
# 是否启用发布者确认 具体确认回调实现见代码
|
||||
publisher-confirms: true
|
||||
# 是否启用发布者返回 具体返回回调实现见代码
|
||||
publisher-returns: true
|
||||
# 是否启用强制消息 保证消息的有效监听
|
||||
template.mandatory: true
|
||||
|
||||
server:
|
||||
port: 8090
|
||||
|
@ -0,0 +1,42 @@
|
||||
package com.heibaiying.rabbitmq;
|
||||
|
||||
import com.heibaiying.bean.Programmer;
|
||||
import com.heibaiying.constant.RabbitBeanInfo;
|
||||
import com.heibaiying.constant.RabbitInfo;
|
||||
import com.heibaiying.rabbitmq.producer.RabbitmqProducer;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
public class RabbitmqProducerTests {
|
||||
|
||||
@Autowired
|
||||
private RabbitmqProducer producer;
|
||||
|
||||
@Test
|
||||
public void send() {
|
||||
Map<String, Object> heads = new HashMap<>();
|
||||
heads.put("msgInfo", "自定义消息头信息");
|
||||
// 模拟生成消息ID,在实际中应该是全局唯一的 消息不可达时候可以在setConfirmCallback回调中取得,可以进行对应的重发或错误处理
|
||||
String id = String.valueOf(Math.round(Math.random() * 10000));
|
||||
producer.sendSimpleMessage(heads, "hello Spring", id, RabbitInfo.EXCHANGE_NAME, "springboot.simple.abc");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void sendBean() {
|
||||
String id = String.valueOf(Math.round(Math.random() * 10000));
|
||||
Programmer programmer = new Programmer("xiaoMing", 12, 12123.45f, new Date());
|
||||
producer.sendSimpleMessage(null, programmer, id, RabbitBeanInfo.EXCHANGE_NAME, RabbitBeanInfo.ROUTING_KEY);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user