From 8992c35a6e3a1e925d7204e549a5199540ca9883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Fri, 4 Jan 2019 15:11:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=20rabbitmq=20=E7=94=A8?= =?UTF-8?q?=E4=BE=8B=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/RabbitmqConsumer.java | 12 +++++----- .../src/main/resources/application.yml | 10 ++++----- .../rabbitmq/producer/RabbitmqProducer.java | 22 +++++++++++++------ .../src/main/resources/application.yml | 8 +++---- .../rabbitmq/RabbitmqProducerTests.java | 6 +++++ 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java index 184094b..e9c254d 100644 --- a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/java/com/heibaiying/rabbitmqconsumer/consumer/RabbitmqConsumer.java @@ -20,18 +20,18 @@ import org.springframework.stereotype.Component; public class RabbitmqConsumer { @RabbitListener(bindings = @QueueBinding( - value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE), - exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE), - key = RabbitInfo.ROUTING_KEY) + value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE), + exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE), + key = RabbitInfo.ROUTING_KEY) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { MessageHeaders headers = message.getHeaders(); // 获取消息头信息和消息体 - log.info("msgInfo:{} ; payload:{} ",headers.get("msgInfo"),message.getPayload()); - // DELIVERY_TAG 是一个自增的数值 + log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload()); + // DELIVERY_TAG 代表 RabbitMQ 向该Channel投递的这条消息的唯一标识ID,是一个单调递增的正整数 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); - // 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 + // 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 DELIVERY_TAG 小于等于传入值的所有消息 channel.basicAck(deliveryTag, false); } diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml index 6bab973..65acc3a 100644 --- a/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-consumer/src/main/resources/application.yml @@ -1,14 +1,14 @@ spring: rabbitmq: - addresses: 192.168.0.108:5672 + addresses: 127.0.0.1:5672 # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 - username: admin - password: admin - virtual-host: my_vhost + username: guest + password: guest + virtual-host: / listener: simple: - # 签收模式设置为手工签收 + # 为了保证信息能够被正确消费,建议签收模式设置为手工签收,并在代码中实现手工签收 acknowledge-mode: manual # 侦听器调用者线程的最小数量 concurrency: 10 diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java index 7d4b3e4..114f650 100644 --- a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/java/com/heibaiying/rabbitmq/producer/RabbitmqProducer.java @@ -1,6 +1,7 @@ package com.heibaiying.rabbitmq.producer; import lombok.extern.slf4j.Slf4j; +import org.apache.tomcat.util.codec.binary.Base64; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +10,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import java.io.UnsupportedEncodingException; import java.util.Map; /** @@ -22,22 +24,28 @@ public class RabbitmqProducer { @Autowired private RabbitTemplate rabbitTemplate; - public void sendSimpleMessage( Map headers,Object message, + public void sendSimpleMessage(Map headers, Object message, String messageId, String exchangeName, String key) { // 自定义消息头 MessageHeaders messageHeaders = new MessageHeaders(headers); // 创建消息 Message msg = MessageBuilder.createMessage(message, messageHeaders); - // 确认的回调 确认消息是否到达 Broker 服务器 + /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器 + 如果发送时候指定的交换器不存在 ack就是false 代表消息不可达 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { - //当ack为false时才表示消息不可达,才需要进行对应的消息补偿机制 - log.info("correlationData:{} , ack:{}", correlationData.getId(),ack); + log.info("correlationData:{} , ack:{}", correlationData.getId(), ack); + if (!ack) { + System.out.println("进行对应的消息补偿机制"); + } }); - // 消息失败的回调 + /* 消息失败的回调 + * 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE + */ rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> { - log.info("message:{}; replyCode{}; replyText{} ;", message1.getBody(), replyCode, replyText); + log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}", + message1, replyCode, replyText, exchange, routingKey); }); - // 在实际中ID 应该是全局唯一 能够唯一标识消息 + // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理 CorrelationData correlationData = new CorrelationData(messageId); rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData); } diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml index 36aa8ec..eeb27f2 100644 --- a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/main/resources/application.yml @@ -1,11 +1,11 @@ spring: rabbitmq: - addresses: 192.168.0.108:5672 + addresses: 127.0.0.1:5672 # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/" # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672 - username: admin - password: admin - virtual-host: my_vhost + username: guest + password: guest + virtual-host: / # 是否启用发布者确认 具体确认回调实现见代码 publisher-confirms: true # 是否启用发布者返回 具体返回回调实现见代码 diff --git a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java index d787c74..5dd18f9 100644 --- a/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java +++ b/spring-boot/spring-boot-rabbitmq/rabbitmq-producer/src/test/java/com/heibaiying/rabbitmq/RabbitmqProducerTests.java @@ -21,6 +21,9 @@ public class RabbitmqProducerTests { @Autowired private RabbitmqProducer producer; + /*** + * 发送消息体为简单数据类型的消息 + */ @Test public void send() { Map heads = new HashMap<>(); @@ -31,6 +34,9 @@ public class RabbitmqProducerTests { } + /*** + * 发送消息体为bean的消息 + */ @Test public void sendBean() { String id = String.valueOf(Math.round(Math.random() * 10000));