diff --git a/pictures/stream-binding-double.png b/pictures/stream-binding-double.png new file mode 100644 index 0000000..44e29f5 Binary files /dev/null and b/pictures/stream-binding-double.png differ diff --git a/pictures/stream-binding.png b/pictures/stream-binding.png new file mode 100644 index 0000000..f5d36af Binary files /dev/null and b/pictures/stream-binding.png differ diff --git a/pictures/stream-connections.png b/pictures/stream-connections.png new file mode 100644 index 0000000..107db8e Binary files /dev/null and b/pictures/stream-connections.png differ diff --git a/pictures/stream-exchanges.png b/pictures/stream-exchanges.png new file mode 100644 index 0000000..184166c Binary files /dev/null and b/pictures/stream-exchanges.png differ diff --git a/pictures/stream-queues.png b/pictures/stream-queues.png new file mode 100644 index 0000000..2781ee4 Binary files /dev/null and b/pictures/stream-queues.png differ diff --git a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/bean/Programmer.java b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/bean/Programmer.java index a098dc9..892da29 100644 --- a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/bean/Programmer.java +++ b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/bean/Programmer.java @@ -2,7 +2,6 @@ package com.heibaiying.stream.bean; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import java.io.Serializable; diff --git a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/controller/MessageController.java b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/controller/MessageController.java index e7207da..1a4d671 100644 --- a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/controller/MessageController.java +++ b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/controller/MessageController.java @@ -1,7 +1,7 @@ package com.heibaiying.stream.controller; import com.heibaiying.stream.bean.Programmer; -import com.heibaiying.stream.stream.Custom; +import com.heibaiying.stream.stream.CustomStream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -21,29 +21,28 @@ import java.util.Map; public class MessageController { @Autowired - private Custom custom; + private CustomStream customStream; /*** - * 发送简单消息 + * 1、发送简单消息 */ @RequestMapping("sendSimpleMessage") public void sendSimpleMessage() { - custom.input().send(MessageBuilder.withPayload("hell spring cloud stream").build()); + customStream.input().send(MessageBuilder.withPayload("hello spring cloud stream").build()); } /*** - * 发送消息体为对象的消息 - * + * 2、发送消息体为对象的消息 */ @RequestMapping("sendObject") public void sendObject() { Programmer programmer=new Programmer("pro",12,212.2f,new Date()); - custom.input().send(MessageBuilder.withPayload(programmer).build()); + customStream.input().send(MessageBuilder.withPayload(programmer).build()); } /** - * 发送带有消息头的消息 + * 3、发送带有消息头的消息 */ @RequestMapping("sendWithHeads") public void sendWithHeads() { @@ -52,11 +51,11 @@ public class MessageController { map.put("code","868686"); MessageHeaders messageHeaders=new MessageHeaders(map); Message message= MessageBuilder.createMessage(programmer,messageHeaders); - custom.input().send(message); + customStream.input().send(message); } /** - * 条件消息 可以看做是消息路由键的一种实现 + * 4、条件消息 可以看做是消息路由键的一种实现 */ @RequestMapping("sendWithKey") public void sendWithKey() { @@ -66,13 +65,29 @@ public class MessageController { map.put("key","01"); MessageHeaders messageHeaders=new MessageHeaders(map); Message message= MessageBuilder.createMessage(programmer,messageHeaders); - custom.input().send(message); + customStream.input().send(message); // 创建消息头key 为 02 的消息 programmer.setName("key02"); map.put("key","02"); MessageHeaders messageHeaders02=new MessageHeaders(map); Message message02= MessageBuilder.createMessage(programmer,messageHeaders02); - custom.input().send(message02); + customStream.input().send(message02); + } + + /** + * 5、消息转发 + */ + @RequestMapping("forward") + public void forward(){ + customStream.input().send(MessageBuilder.withPayload("hello spring cloud stream").build()); + } + + /** + * 5、直接往output发消息 + */ + @RequestMapping("toOutPut") + public void toOutPut(){ + customStream.output().send(MessageBuilder.withPayload("direct to output channel").build()); } } diff --git a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/Custom.java b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/CustomStream.java similarity index 68% rename from spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/Custom.java rename to spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/CustomStream.java index edf972e..1224468 100644 --- a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/Custom.java +++ b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/CustomStream.java @@ -8,15 +8,15 @@ import org.springframework.messaging.SubscribableChannel; /** * @author : heibaiying */ -public interface Custom { +public interface CustomStream { - String INPUT = "input"; - String OUTPUT = "output"; + String INPUT = "customInput"; + String OUTPUT = "customOutput"; - @Input(Custom.INPUT) + @Input(CustomStream.INPUT) SubscribableChannel input(); - @Output(Custom.OUTPUT) + @Output(CustomStream.OUTPUT) MessageChannel output(); } diff --git a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/StreamReceived.java b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/StreamReceived.java index 55f100d..759f4f4 100644 --- a/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/StreamReceived.java +++ b/spring-cloud/spring-cloud-stream/src/main/java/com/heibaiying/stream/stream/StreamReceived.java @@ -7,6 +7,7 @@ import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; import java.util.Map; @@ -14,27 +15,29 @@ import java.util.Map; /** * @author : heibaiying * @description :消息的监听 + * 注意: 测试这个类的时候需要注释掉不必要的监听,因为对同一个通道存在多个监听,任何一个通道都不能存在返回值(如果有返回值应该指定出站目标) 我们下面的forward消息转发的方法是有返回值的 + * 否则会抛出异常: IllegalArgumentException: If multiple @StreamListener methods are listening to the same binding target, none of them may return a value */ @Component -@EnableBinding(Custom.class) +@EnableBinding(CustomStream.class) @Slf4j public class StreamReceived { - @StreamListener(value = Custom.INPUT) + @StreamListener(value = CustomStream.INPUT) public void simple(Object payload) { log.info("收到简单消息: {}", payload); } - @StreamListener(value = Custom.INPUT) + @StreamListener(value = CustomStream.INPUT) public void object(Programmer programmer) { log.info("收到对象消息: {}", programmer); } - /** + /* * 用 @Header 监听时候需要注意,指定名称的属性必须在消息头中存在 不然就会抛出异常 MessageHandlingException: Missing header 'XXXX' for method parameter type [class java.lang.String] */ - @StreamListener(value = Custom.INPUT) + @StreamListener(value = CustomStream.INPUT) public void heads(@Payload Programmer programmer, @Headers Map map, @Header(name = "code") String code) { log.info("收到对象消息: {}", programmer); map.forEach((key, value) -> { @@ -44,19 +47,35 @@ public class StreamReceived { } - /** + /* * 监听消息头key = 01 的消息 */ - @StreamListener(target = Custom.INPUT, condition = "headers['key']=='01'") + @StreamListener(target = CustomStream.INPUT, condition = "headers['key']=='01'") public void key01(@Payload Programmer programmer) { log.info("key01 监听器接收到消息: {}", programmer.getName()); } - /** + /* * 监听消息头key = 02 的消息 */ - @StreamListener(target = Custom.INPUT, condition = "headers['key']=='01'") + @StreamListener(target = CustomStream.INPUT, condition = "headers['key']=='01'") public void key02(@Payload Programmer programmer) { log.info("key02 监听器接收到消息: {}", programmer.getName()); } + + /** + * 消息转发 + */ + @StreamListener(target = CustomStream.INPUT) + @SendTo(CustomStream.OUTPUT) + public String forward(String payload){ + log.info("input forward: {}",payload); + return "forward "+payload; + } + + @StreamListener(target = CustomStream.OUTPUT) + public void outSimpleListen(String payload){ + log.info("output 收到简单消息: {}", payload); + } + } diff --git a/spring-cloud/spring-cloud-stream/src/main/resources/application.yml b/spring-cloud/spring-cloud-stream/src/main/resources/application.yml index 37bb938..7858ee9 100644 --- a/spring-cloud/spring-cloud-stream/src/main/resources/application.yml +++ b/spring-cloud/spring-cloud-stream/src/main/resources/application.yml @@ -3,4 +3,13 @@ spring: host: localhost port: 5672 username: guest - password: guest \ No newline at end of file + password: guest + # 指定消息的分组 默认交换机的类型是topic,发送消息是广播的方式, + # 所有客户端都会受到消息,指定分组后只有一个客户端可以接收到消息 + cloud: + stream: + bindings: + # 和我们在接口中声明消息名称的保持一致 + customInput: + # 分组名称任意填写即可 + group: gro