完善 rabbitmq 用例注释

This commit is contained in:
罗祥 2019-01-04 15:11:39 +08:00
parent 86cfd2b61b
commit 8992c35a6e
5 changed files with 36 additions and 22 deletions

View File

@ -20,18 +20,18 @@ import org.springframework.stereotype.Component;
public class RabbitmqConsumer { public class RabbitmqConsumer {
@RabbitListener(bindings = @QueueBinding( @RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE), value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),
exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE), exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),
key = RabbitInfo.ROUTING_KEY) key = RabbitInfo.ROUTING_KEY)
) )
@RabbitHandler @RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception { public void onMessage(Message message, Channel channel) throws Exception {
MessageHeaders headers = message.getHeaders(); MessageHeaders headers = message.getHeaders();
// 获取消息头信息和消息体 // 获取消息头信息和消息体
log.info("msgInfo:{} ; payload:{} ",headers.get("msgInfo"),message.getPayload()); log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload());
// DELIVERY_TAG 是一个自增的数值 // DELIVERY_TAG 代表 RabbitMQ 向该Channel投递的这条消息的唯一标识ID是一个单调递增的正整数
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 第二个参数代表是否一次签收多条,当该参数为 true 则可以一次性确认 delivery_tag 小于等于传入值的所有消息 // 第二个参数代表是否一次签收多条,当该参数为 true 则可以一次性确认 DELIVERY_TAG 小于等于传入值的所有消息
channel.basicAck(deliveryTag, false); channel.basicAck(deliveryTag, false);
} }

View File

@ -1,14 +1,14 @@
spring: spring:
rabbitmq: rabbitmq:
addresses: 192.168.0.108:5672 addresses: 127.0.0.1:5672
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
username: admin username: guest
password: admin password: guest
virtual-host: my_vhost virtual-host: /
listener: listener:
simple: simple:
# 签收模式设置为手工签收 # 为了保证信息能够被正确消费,建议签收模式设置为手工签收,并在代码中实现手工签收
acknowledge-mode: manual acknowledge-mode: manual
# 侦听器调用者线程的最小数量 # 侦听器调用者线程的最小数量
concurrency: 10 concurrency: 10

View File

@ -1,6 +1,7 @@
package com.heibaiying.rabbitmq.producer; package com.heibaiying.rabbitmq.producer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.codec.binary.Base64;
import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -9,6 +10,7 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.Map; import java.util.Map;
/** /**
@ -22,22 +24,28 @@ public class RabbitmqProducer {
@Autowired @Autowired
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
public void sendSimpleMessage( Map<String, Object> headers,Object message, public void sendSimpleMessage(Map<String, Object> headers, Object message,
String messageId, String exchangeName, String key) { String messageId, String exchangeName, String key) {
// 自定义消息头 // 自定义消息头
MessageHeaders messageHeaders = new MessageHeaders(headers); MessageHeaders messageHeaders = new MessageHeaders(headers);
// 创建消息 // 创建消息
Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders); Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
// 确认的回调 确认消息是否到达 Broker 服务器 /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
如果发送时候指定的交换器不存在 ack就是false 代表消息不可达 */
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
//当ack为false时才表示消息不可达才需要进行对应的消息补偿机制 log.info("correlationData{} , ack:{}", correlationData.getId(), ack);
log.info("correlationData{} , ack:{}", correlationData.getId(),ack); if (!ack) {
System.out.println("进行对应的消息补偿机制");
}
}); });
// 消息失败的回调 /* 消息失败的回调
* 例如消息已经到达交换器上但路由键匹配任何绑定到该交换器的队列会触发这个回调此时 replyText: NO_ROUTE
*/
rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> { rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}; replyCode{}; replyText{} ;", message1.getBody(), replyCode, replyText); log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
message1, replyCode, replyText, exchange, routingKey);
}); });
// 在实际中ID 应该是全局唯一 能够唯一标识消息 // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值进行对应的错误处理
CorrelationData correlationData = new CorrelationData(messageId); CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData); rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData);
} }

View File

@ -1,11 +1,11 @@
spring: spring:
rabbitmq: rabbitmq:
addresses: 192.168.0.108:5672 addresses: 127.0.0.1:5672
# RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
# 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
username: admin username: guest
password: admin password: guest
virtual-host: my_vhost virtual-host: /
# 是否启用发布者确认 具体确认回调实现见代码 # 是否启用发布者确认 具体确认回调实现见代码
publisher-confirms: true publisher-confirms: true
# 是否启用发布者返回 具体返回回调实现见代码 # 是否启用发布者返回 具体返回回调实现见代码

View File

@ -21,6 +21,9 @@ public class RabbitmqProducerTests {
@Autowired @Autowired
private RabbitmqProducer producer; private RabbitmqProducer producer;
/***
* 发送消息体为简单数据类型的消息
*/
@Test @Test
public void send() { public void send() {
Map<String, Object> heads = new HashMap<>(); Map<String, Object> heads = new HashMap<>();
@ -31,6 +34,9 @@ public class RabbitmqProducerTests {
} }
/***
* 发送消息体为bean的消息
*/
@Test @Test
public void sendBean() { public void sendBean() {
String id = String.valueOf(Math.round(Math.random() * 10000)); String id = String.valueOf(Math.round(Math.random() * 10000));