From ea0cbeace06140b0c9a23c6ac2e3e80a39225d14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Wed, 26 Dec 2018 17:54:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0rabbitMQ=20=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +- spring/spring-rabbitmq-annotation/pom.xml | 88 +++++++++++++++ .../com/heibaiying/bean/ProductManager.java | 23 ++++ .../java/com/heibaiying/bean/Programmer.java | 25 +++++ .../java/com/heibaiying/constant/Type.java | 12 ++ .../heibaiying/rabbit/RabbitBaseConfig.java | 57 ++++++++++ .../rabbit/config/MessageDelegate.java | 20 ++++ .../rabbit/config/RabbitBaseAnnotation.java | 106 ++++++++++++++++++ .../rabbit/config/RabbitObjectAnnotation.java | 75 +++++++++++++ .../rabbit/config/RabbitProperty.java | 29 +++++ .../src/main/resources/rabbitmq.properties | 5 + .../rabbit/RabbitSendObjectTest.java | 58 ++++++++++ .../com/heibaiying/rabbit/RabbitTest.java | 51 +++++++++ spring/spring-rabbitmq/pom.xml | 88 +++++++++++++++ .../com/heibaiying/bean/ProductManager.java | 23 ++++ .../java/com/heibaiying/bean/Programmer.java | 25 +++++ .../java/com/heibaiying/constant/Type.java | 12 ++ .../heibaiying/rabbit/MessageDelegate.java | 20 ++++ .../rabbit/RabbitBaseAnnotation.java | 102 +++++++++++++++++ .../rabbit/RabbitObjectAnnotation.java | 73 ++++++++++++ .../src/main/resources/rabbitmq.properties | 5 + .../src/main/resources/rabbitmq.xml | 43 +++++++ .../rabbit/RabbitSendObjectTest.java | 58 ++++++++++ .../com/heibaiying/rabbit/RabbitTest.java | 51 +++++++++ 24 files changed, 1052 insertions(+), 3 deletions(-) create mode 100644 spring/spring-rabbitmq-annotation/pom.xml create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/ProductManager.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/Programmer.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/constant/Type.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/RabbitBaseConfig.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/MessageDelegate.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitBaseAnnotation.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitObjectAnnotation.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitProperty.java create mode 100644 spring/spring-rabbitmq-annotation/src/main/resources/rabbitmq.properties create mode 100644 spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java create mode 100644 spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitTest.java create mode 100644 spring/spring-rabbitmq/pom.xml create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/ProductManager.java create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/Programmer.java create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/constant/Type.java create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/MessageDelegate.java create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitBaseAnnotation.java create mode 100644 spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitObjectAnnotation.java create mode 100644 spring/spring-rabbitmq/src/main/resources/rabbitmq.properties create mode 100644 spring/spring-rabbitmq/src/main/resources/rabbitmq.xml create mode 100644 spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java create mode 100644 spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitTest.java diff --git a/README.md b/README.md index 0b8890d..f113b22 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,9 @@ spring-cloud:Finchley.SR2 | [spring-jdbc](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-jdbc)
[spring-jdbc-annotation](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-jdbc-annotation) | spring jdbc-template 的使用 | [Using JdbcTemplat](https://docs.spring.io/spring/docs/5.1.3.RELEASE/spring-framework-reference/data-access.html#jdbc-JdbcTemplate) | | [spring-mybatis](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-mybatis)
[spring-mybatis-annotation](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-mybatis-annotation) | spring 整合 mybatis | [mybatis-spring中文文档](http://www.mybatis.org/spring/zh/index.html) | | [spring-druid-mybatis](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-druid-mybatis)
[spring-druid-mybatis-annotation](https://github.com/heibaiying/spring-samples-for-all/tree/master/spring/spring-druid-mybatis-annotation) | spring 整合druid、mybatis | [alibaba druid](https://github.com/alibaba/druid/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98) | -| spring-redis
spring-redis-annotation | spring 整合 redis 单机、集群(jedis)
spring 整合 redis 单机、集群(redisson) | [redisson中文文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | -| spring-mongodb
spring-mongodb-annotation | spring 整合 mongodb | | -| spring-memcached
spring-memcached-annotation | spring 整合 memcached | [Xmemcached中文文档](https://github.com/killme2008/xmemcached/wiki/Xmemcached%20%E4%B8%AD%E6%96%87%E7%94%A8%E6%88%B7%E6%8C%87%E5%8D%97) | +| spring-redis
spring-redis-annotation | spring 整合 redis 单机、集群(jedis客户端)
spring 整合 redis 单机、集群(redisson客户端) | [redisson中文文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | +| spring-mongodb
spring-mongodb-annotation | spring 整合 mongodb | [Spring Data MongoDB](https://docs.spring.io/spring-data/mongodb/docs/2.1.3.RELEASE/reference/html/#mongo.mongo-db-factory-java) | +| spring-memcached
spring-memcached-annotation | spring 整合 memcached(单机、集群) | [Xmemcached中文文档](https://github.com/killme2008/xmemcached/wiki/Xmemcached%20%E4%B8%AD%E6%96%87%E7%94%A8%E6%88%B7%E6%8C%87%E5%8D%97) | | spring-rabbitmq
spring-rabbitmq-annotation | spring 整合 rabbitmq | | | spring-kafka
spring-kafka-annotation | spring 整合 kafka | | | spring-dubbo
spring-dubbo-annotation | spring 整合 dubbo | | diff --git a/spring/spring-rabbitmq-annotation/pom.xml b/spring/spring-rabbitmq-annotation/pom.xml new file mode 100644 index 0000000..4bac506 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + com.heibaiying + spring-rabbitmq + 1.0-SNAPSHOT + + + 5.1.3.RELEASE + 1.8 + 1.8 + + + + + org.springframework + spring-context + ${spring-base-version} + + + org.springframework + spring-beans + ${spring-base-version} + + + org.springframework + spring-core + ${spring-base-version} + + + org.springframework + spring-web + ${spring-base-version} + + + org.springframework + spring-webmvc + ${spring-base-version} + + + + org.springframework.amqp + spring-rabbit + 2.1.2.RELEASE + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.8 + + + + junit + junit + 4.12 + test + + + org.springframework + spring-test + ${spring-base-version} + test + + + org.projectlombok + lombok + 1.18.4 + provided + + + + + spring-rabbitmq + + + src/main/resources + + + src/main/java + + + + + \ No newline at end of file diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/ProductManager.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/ProductManager.java new file mode 100644 index 0000000..27e62f5 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/ProductManager.java @@ -0,0 +1,23 @@ +package com.heibaiying.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ProductManager { + + private String name; + + private int age; + + private Date birthday; +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/Programmer.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/Programmer.java new file mode 100644 index 0000000..5cff5ab --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/bean/Programmer.java @@ -0,0 +1,25 @@ +package com.heibaiying.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Programmer { + + private String name; + + private int age; + + private float salary; + + private Date birthday; +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/constant/Type.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/constant/Type.java new file mode 100644 index 0000000..71cb4d9 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/constant/Type.java @@ -0,0 +1,12 @@ +package com.heibaiying.constant; + +/** + * @author : heibaiying + * @description : + */ +public interface Type { + + String MANAGER="manager"; + + String PROGRAMMER="programmer"; +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/RabbitBaseConfig.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/RabbitBaseConfig.java new file mode 100644 index 0000000..c498ce8 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/RabbitBaseConfig.java @@ -0,0 +1,57 @@ +package com.heibaiying.rabbit; + +import com.heibaiying.rabbit.config.RabbitProperty; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.ComponentScans; +import org.springframework.context.annotation.Configuration; + +/** + * @author : heibaiying + * @description : 声明队列、交换机、绑定关系、和队列消息监听 + */ + +@Configuration +@ComponentScan("com.heibaiying.rabbit.config") +public class RabbitBaseConfig { + + + /** + * 声明连接工厂 + */ + @Bean + public ConnectionFactory connectionFactory(RabbitProperty property) { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setAddresses(property.getAddresses()); + connectionFactory.setUsername(property.getUsername()); + connectionFactory.setPassword(property.getPassword()); + connectionFactory.setVirtualHost(property.getVirtualhost()); + return connectionFactory; + } + + /** + * 创建一个管理器(org.springframework.amqp.rabbit.core.RabbitAdmin),用于管理交换,队列和绑定。 + * auto-startup 指定是否自动声明上下文中的队列,交换和绑定, 默认值为true。 + */ + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setAutoStartup(true); + return rabbitAdmin; + } + + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + return rabbitTemplate; + } +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/MessageDelegate.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/MessageDelegate.java new file mode 100644 index 0000000..85ab4f6 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/MessageDelegate.java @@ -0,0 +1,20 @@ +package com.heibaiying.rabbit.config; + +import com.heibaiying.bean.ProductManager; +import com.heibaiying.bean.Programmer; + +/** + * @author : heibaiying + * @description :消息委派处理类 + */ +public class MessageDelegate { + + public void handleMessage(ProductManager manager) { + System.out.println("收到一个产品经理" + manager); + } + + public void handleMessage(Programmer programmer) { + System.out.println("收到一个程序员" + programmer); + } + +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitBaseAnnotation.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitBaseAnnotation.java new file mode 100644 index 0000000..6afcfe9 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitBaseAnnotation.java @@ -0,0 +1,106 @@ +package com.heibaiying.rabbit.config; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +/** + * @author : heibaiying + * @description : 声明队列、交换机、绑定关系、和队列消息监听 + */ + +@Configuration +public class RabbitBaseAnnotation { + + @Bean + public TopicExchange exchange() { + // 创建一个持久化的交换机 + return new TopicExchange("topic01", true, false); + } + + @Bean + public Queue firstQueue() { + // 创建一个持久化的队列1 + return new Queue("FirstQueue", true); + } + + @Bean + public Queue secondQueue() { + // 创建一个持久化的队列2 + return new Queue("SecondQueue", true); + } + + /** + * BindingKey 中可以存在两种特殊的字符串“#”和“*”,其中“*”用于匹配一个单词,“#”用于匹配零个或者多个单词 + * 这里我们声明三个绑定关系用于测试topic这种类型交换器 + */ + @Bean + public Binding orange() { + return BindingBuilder.bind(firstQueue()).to(exchange()).with("*.orange.*"); + } + + @Bean + public Binding rabbit() { + return BindingBuilder.bind(secondQueue()).to(exchange()).with("*.*.rabbit"); + } + + @Bean + public Binding lazy() { + return BindingBuilder.bind(secondQueue()).to(exchange()).with("lazy.#"); + } + + + /*创建队列1消费者监听*/ + @Bean + public SimpleMessageListenerContainer firstQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + // 设置监听的队列 + container.setQueues(firstQueue()); + // 指定要创建的并发使用者数。 + container.setConcurrentConsumers(1); + // 设置消费者数量的上限 + container.setMaxConcurrentConsumers(5); + // 设置是否自动签收消费 为保证消费被成功消费,建议手工签收 + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + container.setMessageListener(new ChannelAwareMessageListener() { + @Override + public void onMessage(Message message, Channel channel) throws Exception { + // 可以在这个地方得到消息额外属性 + MessageProperties properties = message.getMessageProperties(); + //得到消息体内容 + byte[] body = message.getBody(); + System.out.println(firstQueue().getName() + "收到消息:" + new String(body)); + //第二个参数 代表是否一次签收多条 + channel.basicAck(properties.getDeliveryTag(), false); + } + }); + return container; + } + + + /*创建队列2消费者监听*/ + @Bean + public SimpleMessageListenerContainer secondQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueues(secondQueue()); + container.setMessageListener(new ChannelAwareMessageListener() { + @Override + public void onMessage(Message message, Channel channel) throws Exception { + byte[] body = message.getBody(); + System.out.println(secondQueue().getName() + "收到消息:" + new String(body)); + } + }); + return container; + } + +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitObjectAnnotation.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitObjectAnnotation.java new file mode 100644 index 0000000..ea58952 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitObjectAnnotation.java @@ -0,0 +1,75 @@ +package com.heibaiying.rabbit.config; + +import com.heibaiying.constant.Type; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; +import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author : heibaiying + * @description : 声明队列、交换机、绑定关系、用于测试对象的消息传递 + */ + +@Configuration +public class RabbitObjectAnnotation { + + + @Bean + public DirectExchange objectTopic() { + // 创建一个持久化的交换机 + return new DirectExchange("objectTopic", true, false); + } + + @Bean + public Queue objectQueue() { + // 创建一个持久化的队列 + return new Queue("objectQueue", true); + } + + @Bean + public Binding binding() { + return BindingBuilder.bind(objectQueue()).to(objectTopic()).with("object"); + } + + + /*创建队列消费者监听*/ + @Bean + public SimpleMessageListenerContainer objectQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + // 设置监听的队列 + container.setQueues(objectQueue()); + // 将监听到的消息委派给实际的处理类 + MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); + // 指定由哪个方法来处理消息 默认就是handleMessage + adapter.setDefaultListenerMethod("handleMessage"); + + // 消息转换 + Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); + DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); + + Map> idClassMapping = new HashMap<>(); + // 针对不同的消息体调用不同的重载方法 + idClassMapping.put(Type.MANAGER, com.heibaiying.bean.ProductManager.class); + idClassMapping.put(Type.PROGRAMMER, com.heibaiying.bean.Programmer.class); + + javaTypeMapper.setIdClassMapping(idClassMapping); + + jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); + adapter.setMessageConverter(jackson2JsonMessageConverter); + container.setMessageListener(adapter); + return container; + } + +} diff --git a/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitProperty.java b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitProperty.java new file mode 100644 index 0000000..4ca8280 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/java/com/heibaiying/rabbit/config/RabbitProperty.java @@ -0,0 +1,29 @@ +package com.heibaiying.rabbit.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; + +/** + * @author : heibaiying + * @description : rabbit 属性配置 + */ +@Data +@PropertySource(value = "classpath:rabbitmq.properties") +@Configuration +public class RabbitProperty { + + + @Value("${rabbitmq.addresses}") + private String addresses; + + @Value("${rabbitmq.username}") + private String username; + + @Value("${rabbitmq.password}") + private String password; + + @Value("${rabbitmq.virtualhost}") + private String virtualhost; +} diff --git a/spring/spring-rabbitmq-annotation/src/main/resources/rabbitmq.properties b/spring/spring-rabbitmq-annotation/src/main/resources/rabbitmq.properties new file mode 100644 index 0000000..79aab69 --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/main/resources/rabbitmq.properties @@ -0,0 +1,5 @@ +rabbitmq.addresses=localhost:5672 +rabbitmq.username=guest +rabbitmq.password=guest +# Ϊռ ĬΪ/ ͼν߹̨ܿ 򲻻Զһ׳쳣 +rabbitmq.virtualhost=/ \ No newline at end of file diff --git a/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java b/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java new file mode 100644 index 0000000..7b53d0b --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java @@ -0,0 +1,58 @@ +package com.heibaiying.rabbit; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.heibaiying.bean.ProductManager; +import com.heibaiying.bean.Programmer; +import com.heibaiying.constant.Type; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : 传输对象 + */ + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = RabbitBaseConfig.class) +public class RabbitSendObjectTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Test + public void sendProgrammer() throws JsonProcessingException { + MessageProperties messageProperties = new MessageProperties(); + //必须设置 contentType为 application/json + messageProperties.setContentType("application/json"); + // 必须指定类型 + messageProperties.getHeaders().put("__TypeId__", Type.PROGRAMMER); + Programmer programmer = new Programmer("xiaoming", 34, 52200.21f, new Date()); + // 序列化与反序列化都使用的Jackson + ObjectMapper mapper = new ObjectMapper(); + String programmerJson = mapper.writeValueAsString(programmer); + Message message = new Message(programmerJson.getBytes(), messageProperties); + rabbitTemplate.send("objectTopic", "object", message); + } + + + @Test + public void sendProductManager() throws JsonProcessingException { + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("application/json"); + messageProperties.getHeaders().put("__TypeId__", Type.MANAGER); + ProductManager manager = new ProductManager("xiaohong", 21, new Date()); + ObjectMapper mapper = new ObjectMapper(); + String managerJson = mapper.writeValueAsString(manager); + Message message = new Message(managerJson.getBytes(), messageProperties); + rabbitTemplate.send("objectTopic", "object", message); + } +} diff --git a/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitTest.java b/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitTest.java new file mode 100644 index 0000000..b267f5e --- /dev/null +++ b/spring/spring-rabbitmq-annotation/src/test/java/com/heibaiying/rabbit/RabbitTest.java @@ -0,0 +1,51 @@ +package com.heibaiying.rabbit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author : heibaiying + * @description : 传输简单字符串 + */ + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = RabbitBaseConfig.class) +public class RabbitTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Test + public void sendMessage() { + MessageProperties properties = new MessageProperties(); + + String allReceived = "我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到"; + Message message1 = new Message(allReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.orange.rabbit", message1); + + String firstReceived = "我的路由键 quick.orange.fox 只符合queue1 的要求,只能被queue 1 接收到"; + Message message2 = new Message(firstReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.orange.fox", message2); + + String secondReceived = "我的路由键 lazy.brown.fox 只符合queue2 的要求,只能被queue 2 接收到"; + Message message3 = new Message(secondReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "lazy.brown.fox", message3); + + String notReceived = "我的路由键 quick.brown.fox 不符合 topic1 任何绑定队列的要求,你将看不到我"; + Message message4 = new Message(notReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.brown.fox", message4); + + /* + * SecondQueue收到消息:我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到 + * FirstQueue收到消息:我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到 + * FirstQueue收到消息:我的路由键 quick.orange.fox 只符合queue1 的要求,只能被queue 1 接收到 + * SecondQueue收到消息:我的路由键 lazy.brown.fox 只符合queue2 的要求,只能被queue 2 接收到 + */ + } +} diff --git a/spring/spring-rabbitmq/pom.xml b/spring/spring-rabbitmq/pom.xml new file mode 100644 index 0000000..4bac506 --- /dev/null +++ b/spring/spring-rabbitmq/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + com.heibaiying + spring-rabbitmq + 1.0-SNAPSHOT + + + 5.1.3.RELEASE + 1.8 + 1.8 + + + + + org.springframework + spring-context + ${spring-base-version} + + + org.springframework + spring-beans + ${spring-base-version} + + + org.springframework + spring-core + ${spring-base-version} + + + org.springframework + spring-web + ${spring-base-version} + + + org.springframework + spring-webmvc + ${spring-base-version} + + + + org.springframework.amqp + spring-rabbit + 2.1.2.RELEASE + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.8 + + + + junit + junit + 4.12 + test + + + org.springframework + spring-test + ${spring-base-version} + test + + + org.projectlombok + lombok + 1.18.4 + provided + + + + + spring-rabbitmq + + + src/main/resources + + + src/main/java + + + + + \ No newline at end of file diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/ProductManager.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/ProductManager.java new file mode 100644 index 0000000..27e62f5 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/ProductManager.java @@ -0,0 +1,23 @@ +package com.heibaiying.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ProductManager { + + private String name; + + private int age; + + private Date birthday; +} diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/Programmer.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/Programmer.java new file mode 100644 index 0000000..5cff5ab --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/bean/Programmer.java @@ -0,0 +1,25 @@ +package com.heibaiying.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Programmer { + + private String name; + + private int age; + + private float salary; + + private Date birthday; +} diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/constant/Type.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/constant/Type.java new file mode 100644 index 0000000..71cb4d9 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/constant/Type.java @@ -0,0 +1,12 @@ +package com.heibaiying.constant; + +/** + * @author : heibaiying + * @description : + */ +public interface Type { + + String MANAGER="manager"; + + String PROGRAMMER="programmer"; +} diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/MessageDelegate.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/MessageDelegate.java new file mode 100644 index 0000000..83864a3 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/MessageDelegate.java @@ -0,0 +1,20 @@ +package com.heibaiying.rabbit; + +import com.heibaiying.bean.ProductManager; +import com.heibaiying.bean.Programmer; + +/** + * @author : heibaiying + * @description :消息委派处理类 + */ +public class MessageDelegate { + + public void handleMessage(ProductManager manager) { + System.out.println("收到一个产品经理" + manager); + } + + public void handleMessage(Programmer programmer) { + System.out.println("收到一个程序员" + programmer); + } + +} diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitBaseAnnotation.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitBaseAnnotation.java new file mode 100644 index 0000000..5581346 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitBaseAnnotation.java @@ -0,0 +1,102 @@ +package com.heibaiying.rabbit; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author : heibaiying + * @description : 声明队列、交换机、绑定关系、和队列消息监听 + */ + +@Configuration +public class RabbitBaseAnnotation { + + @Bean + public TopicExchange exchange() { + // 创建一个持久化的交换机 + return new TopicExchange("topic01", true, false); + } + + @Bean + public Queue firstQueue() { + // 创建一个持久化的队列1 + return new Queue("FirstQueue", true); + } + + @Bean + public Queue secondQueue() { + // 创建一个持久化的队列2 + return new Queue("SecondQueue", true); + } + + /** + * BindingKey 中可以存在两种特殊的字符串“#”和“*”,其中“*”用于匹配一个单词,“#”用于匹配零个或者多个单词 + * 这里我们声明三个绑定关系用于测试topic这种类型交换器 + */ + @Bean + public Binding orange() { + return BindingBuilder.bind(firstQueue()).to(exchange()).with("*.orange.*"); + } + + @Bean + public Binding rabbit() { + return BindingBuilder.bind(secondQueue()).to(exchange()).with("*.*.rabbit"); + } + + @Bean + public Binding lazy() { + return BindingBuilder.bind(secondQueue()).to(exchange()).with("lazy.#"); + } + + + /*创建队列1消费者监听*/ + @Bean + public SimpleMessageListenerContainer firstQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + // 设置监听的队列 + container.setQueues(firstQueue()); + // 指定要创建的并发使用者数。 + container.setConcurrentConsumers(1); + // 设置消费者数量的上限 + container.setMaxConcurrentConsumers(5); + // 设置是否自动签收消费 为保证消费被成功消费,建议手工签收 + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + container.setMessageListener(new ChannelAwareMessageListener() { + @Override + public void onMessage(Message message, Channel channel) throws Exception { + // 可以在这个地方得到消息额外属性 + MessageProperties properties = message.getMessageProperties(); + //得到消息体内容 + byte[] body = message.getBody(); + System.out.println(firstQueue().getName() + "收到消息:" + new String(body)); + //第二个参数 代表是否一次签收多条 + channel.basicAck(properties.getDeliveryTag(), false); + } + }); + return container; + } + + + /*创建队列2消费者监听*/ + @Bean + public SimpleMessageListenerContainer secondQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueues(secondQueue()); + container.setMessageListener(new ChannelAwareMessageListener() { + @Override + public void onMessage(Message message, Channel channel) throws Exception { + byte[] body = message.getBody(); + System.out.println(secondQueue().getName() + "收到消息:" + new String(body)); + } + }); + return container; + } + +} diff --git a/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitObjectAnnotation.java b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitObjectAnnotation.java new file mode 100644 index 0000000..8e4777b --- /dev/null +++ b/spring/spring-rabbitmq/src/main/java/com/heibaiying/rabbit/RabbitObjectAnnotation.java @@ -0,0 +1,73 @@ +package com.heibaiying.rabbit; + +import com.heibaiying.constant.Type; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author : heibaiying + * @description : 声明队列、交换机、绑定关系、用于测试对象的消息传递 + */ + +@Configuration +public class RabbitObjectAnnotation { + + @Bean + public DirectExchange objectTopic() { + // 创建一个持久化的交换机 + return new DirectExchange("objectTopic", true, false); + } + + @Bean + public Queue objectQueue() { + // 创建一个持久化的队列 + return new Queue("objectQueue", true); + } + + @Bean + public Binding binding() { + return BindingBuilder.bind(objectQueue()).to(objectTopic()).with("object"); + } + + + /*创建队列消费者监听*/ + @Bean + public SimpleMessageListenerContainer objectQueueLister(ConnectionFactory connectionFactory) { + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + // 设置监听的队列 + container.setQueues(objectQueue()); + // 将监听到的消息委派给实际的处理类 + MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); + // 指定由哪个方法来处理消息 默认就是handleMessage + adapter.setDefaultListenerMethod("handleMessage"); + + // 消息转换 + Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); + DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); + + Map> idClassMapping = new HashMap<>(); + // 针对不同的消息体调用不同的重载方法 + idClassMapping.put(Type.MANAGER, com.heibaiying.bean.ProductManager.class); + idClassMapping.put(Type.PROGRAMMER, com.heibaiying.bean.Programmer.class); + + javaTypeMapper.setIdClassMapping(idClassMapping); + + jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); + adapter.setMessageConverter(jackson2JsonMessageConverter); + container.setMessageListener(adapter); + return container; + } + +} diff --git a/spring/spring-rabbitmq/src/main/resources/rabbitmq.properties b/spring/spring-rabbitmq/src/main/resources/rabbitmq.properties new file mode 100644 index 0000000..79aab69 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/resources/rabbitmq.properties @@ -0,0 +1,5 @@ +rabbitmq.addresses=localhost:5672 +rabbitmq.username=guest +rabbitmq.password=guest +# Ϊռ ĬΪ/ ͼν߹̨ܿ 򲻻Զһ׳쳣 +rabbitmq.virtualhost=/ \ No newline at end of file diff --git a/spring/spring-rabbitmq/src/main/resources/rabbitmq.xml b/spring/spring-rabbitmq/src/main/resources/rabbitmq.xml new file mode 100644 index 0000000..f620f51 --- /dev/null +++ b/spring/spring-rabbitmq/src/main/resources/rabbitmq.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java b/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java new file mode 100644 index 0000000..db03076 --- /dev/null +++ b/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitSendObjectTest.java @@ -0,0 +1,58 @@ +package com.heibaiying.rabbit; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.heibaiying.bean.ProductManager; +import com.heibaiying.bean.Programmer; +import com.heibaiying.constant.Type; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Date; + +/** + * @author : heibaiying + * @description : 传输对象 + */ + +@RunWith(SpringRunner.class) +@ContextConfiguration(locations = "classpath:rabbitmq.xml") +public class RabbitSendObjectTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Test + public void sendProgrammer() throws JsonProcessingException { + MessageProperties messageProperties = new MessageProperties(); + //必须设置 contentType为 application/json + messageProperties.setContentType("application/json"); + // 必须指定类型 + messageProperties.getHeaders().put("__TypeId__", Type.PROGRAMMER); + Programmer programmer = new Programmer("xiaoming", 34, 52200.21f, new Date()); + // 序列化与反序列化都使用的Jackson + ObjectMapper mapper = new ObjectMapper(); + String programmerJson = mapper.writeValueAsString(programmer); + Message message = new Message(programmerJson.getBytes(), messageProperties); + rabbitTemplate.send("objectTopic", "object", message); + } + + + @Test + public void sendProductManager() throws JsonProcessingException { + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("application/json"); + messageProperties.getHeaders().put("__TypeId__", Type.MANAGER); + ProductManager manager = new ProductManager("xiaohong", 21, new Date()); + ObjectMapper mapper = new ObjectMapper(); + String managerJson = mapper.writeValueAsString(manager); + Message message = new Message(managerJson.getBytes(), messageProperties); + rabbitTemplate.send("objectTopic", "object", message); + } +} diff --git a/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitTest.java b/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitTest.java new file mode 100644 index 0000000..32c408d --- /dev/null +++ b/spring/spring-rabbitmq/src/test/java/com/heibaiying/rabbit/RabbitTest.java @@ -0,0 +1,51 @@ +package com.heibaiying.rabbit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author : heibaiying + * @description : 传输简单字符串 + */ + +@RunWith(SpringRunner.class) +@ContextConfiguration(locations = "classpath:rabbitmq.xml") +public class RabbitTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Test + public void sendMessage() { + MessageProperties properties = new MessageProperties(); + + String allReceived = "我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到"; + Message message1 = new Message(allReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.orange.rabbit", message1); + + String firstReceived = "我的路由键 quick.orange.fox 只符合queue1 的要求,只能被queue 1 接收到"; + Message message2 = new Message(firstReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.orange.fox", message2); + + String secondReceived = "我的路由键 lazy.brown.fox 只符合queue2 的要求,只能被queue 2 接收到"; + Message message3 = new Message(secondReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "lazy.brown.fox", message3); + + String notReceived = "我的路由键 quick.brown.fox 不符合 topic1 任何绑定队列的要求,你将看不到我"; + Message message4 = new Message(notReceived.getBytes(), properties); + rabbitTemplate.send("topic01", "quick.brown.fox", message4); + + /* + * SecondQueue收到消息:我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到 + * FirstQueue收到消息:我的路由键 quick.orange.rabbit 符合queue1 和 queue2 的要求,我应该被两个监听器接收到 + * FirstQueue收到消息:我的路由键 quick.orange.fox 只符合queue1 的要求,只能被queue 1 接收到 + * SecondQueue收到消息:我的路由键 lazy.brown.fox 只符合queue2 的要求,只能被queue 2 接收到 + */ + } +}