新增 spring cloud 用例
This commit is contained in:
		@@ -2,7 +2,6 @@ package com.heibaiying.stream.bean;
 | 
			
		||||
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import lombok.NoArgsConstructor;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
package com.heibaiying.stream.controller;
 | 
			
		||||
 | 
			
		||||
import com.heibaiying.stream.bean.Programmer;
 | 
			
		||||
import com.heibaiying.stream.stream.Custom;
 | 
			
		||||
import com.heibaiying.stream.stream.CustomStream;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.messaging.Message;
 | 
			
		||||
import org.springframework.messaging.MessageHeaders;
 | 
			
		||||
@@ -21,29 +21,28 @@ import java.util.Map;
 | 
			
		||||
public class MessageController {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private Custom custom;
 | 
			
		||||
    private CustomStream customStream;
 | 
			
		||||
 | 
			
		||||
    /***
 | 
			
		||||
     * 发送简单消息
 | 
			
		||||
     * 1、发送简单消息
 | 
			
		||||
     */
 | 
			
		||||
   @RequestMapping("sendSimpleMessage")
 | 
			
		||||
   public void sendSimpleMessage() {
 | 
			
		||||
       custom.input().send(MessageBuilder.withPayload("hell spring cloud stream").build());
 | 
			
		||||
       customStream.input().send(MessageBuilder.withPayload("hello spring cloud stream").build());
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    /***
 | 
			
		||||
     * 发送消息体为对象的消息
 | 
			
		||||
     *
 | 
			
		||||
     * 2、发送消息体为对象的消息
 | 
			
		||||
     */
 | 
			
		||||
    @RequestMapping("sendObject")
 | 
			
		||||
    public void sendObject() {
 | 
			
		||||
        Programmer programmer=new Programmer("pro",12,212.2f,new Date());
 | 
			
		||||
        custom.input().send(MessageBuilder.withPayload(programmer).build());
 | 
			
		||||
        customStream.input().send(MessageBuilder.withPayload(programmer).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 发送带有消息头的消息
 | 
			
		||||
     * 3、发送带有消息头的消息
 | 
			
		||||
     */
 | 
			
		||||
    @RequestMapping("sendWithHeads")
 | 
			
		||||
    public void sendWithHeads() {
 | 
			
		||||
@@ -52,11 +51,11 @@ public class MessageController {
 | 
			
		||||
        map.put("code","868686");
 | 
			
		||||
        MessageHeaders messageHeaders=new MessageHeaders(map);
 | 
			
		||||
        Message<Programmer> message= MessageBuilder.createMessage(programmer,messageHeaders);
 | 
			
		||||
        custom.input().send(message);
 | 
			
		||||
        customStream.input().send(message);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 条件消息 可以看做是消息路由键的一种实现
 | 
			
		||||
     * 4、条件消息 可以看做是消息路由键的一种实现
 | 
			
		||||
     */
 | 
			
		||||
    @RequestMapping("sendWithKey")
 | 
			
		||||
    public void sendWithKey() {
 | 
			
		||||
@@ -66,13 +65,29 @@ public class MessageController {
 | 
			
		||||
        map.put("key","01");
 | 
			
		||||
        MessageHeaders messageHeaders=new MessageHeaders(map);
 | 
			
		||||
        Message<Programmer> message= MessageBuilder.createMessage(programmer,messageHeaders);
 | 
			
		||||
        custom.input().send(message);
 | 
			
		||||
        customStream.input().send(message);
 | 
			
		||||
 | 
			
		||||
        // 创建消息头key 为 02 的消息
 | 
			
		||||
        programmer.setName("key02");
 | 
			
		||||
        map.put("key","02");
 | 
			
		||||
        MessageHeaders messageHeaders02=new MessageHeaders(map);
 | 
			
		||||
        Message<Programmer> message02= MessageBuilder.createMessage(programmer,messageHeaders02);
 | 
			
		||||
        custom.input().send(message02);
 | 
			
		||||
        customStream.input().send(message02);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 5、消息转发
 | 
			
		||||
     */
 | 
			
		||||
    @RequestMapping("forward")
 | 
			
		||||
    public void forward(){
 | 
			
		||||
        customStream.input().send(MessageBuilder.withPayload("hello spring cloud stream").build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 5、直接往output发消息
 | 
			
		||||
     */
 | 
			
		||||
    @RequestMapping("toOutPut")
 | 
			
		||||
    public void toOutPut(){
 | 
			
		||||
        customStream.output().send(MessageBuilder.withPayload("direct to output channel").build());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -8,15 +8,15 @@ import org.springframework.messaging.SubscribableChannel;
 | 
			
		||||
/**
 | 
			
		||||
 * @author : heibaiying
 | 
			
		||||
 */
 | 
			
		||||
public interface Custom {
 | 
			
		||||
public interface CustomStream {
 | 
			
		||||
 | 
			
		||||
    String INPUT = "input";
 | 
			
		||||
    String OUTPUT = "output";
 | 
			
		||||
    String INPUT = "customInput";
 | 
			
		||||
    String OUTPUT = "customOutput";
 | 
			
		||||
 | 
			
		||||
    @Input(Custom.INPUT)
 | 
			
		||||
    @Input(CustomStream.INPUT)
 | 
			
		||||
    SubscribableChannel input();
 | 
			
		||||
 | 
			
		||||
    @Output(Custom.OUTPUT)
 | 
			
		||||
    @Output(CustomStream.OUTPUT)
 | 
			
		||||
    MessageChannel output();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -7,6 +7,7 @@ import org.springframework.cloud.stream.annotation.StreamListener;
 | 
			
		||||
import org.springframework.messaging.handler.annotation.Header;
 | 
			
		||||
import org.springframework.messaging.handler.annotation.Headers;
 | 
			
		||||
import org.springframework.messaging.handler.annotation.Payload;
 | 
			
		||||
import org.springframework.messaging.handler.annotation.SendTo;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
@@ -14,27 +15,29 @@ import java.util.Map;
 | 
			
		||||
/**
 | 
			
		||||
 * @author : heibaiying
 | 
			
		||||
 * @description :消息的监听
 | 
			
		||||
 * 注意:  测试这个类的时候需要注释掉不必要的监听,因为对同一个通道存在多个监听,任何一个通道都不能存在返回值(如果有返回值应该指定出站目标) 我们下面的forward消息转发的方法是有返回值的
 | 
			
		||||
 * 否则会抛出异常: IllegalArgumentException: If multiple @StreamListener methods are listening to the same binding target, none of them may return a value
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@EnableBinding(Custom.class)
 | 
			
		||||
@EnableBinding(CustomStream.class)
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class StreamReceived {
 | 
			
		||||
 | 
			
		||||
    @StreamListener(value = Custom.INPUT)
 | 
			
		||||
    @StreamListener(value = CustomStream.INPUT)
 | 
			
		||||
    public void simple(Object payload) {
 | 
			
		||||
        log.info("收到简单消息: {}", payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @StreamListener(value = Custom.INPUT)
 | 
			
		||||
    @StreamListener(value = CustomStream.INPUT)
 | 
			
		||||
    public void object(Programmer programmer) {
 | 
			
		||||
        log.info("收到对象消息: {}", programmer);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
    /*
 | 
			
		||||
     * 用 @Header 监听时候需要注意,指定名称的属性必须在消息头中存在 不然就会抛出异常 MessageHandlingException: Missing header 'XXXX' for method parameter type [class java.lang.String]
 | 
			
		||||
     */
 | 
			
		||||
    @StreamListener(value = Custom.INPUT)
 | 
			
		||||
    @StreamListener(value = CustomStream.INPUT)
 | 
			
		||||
    public void heads(@Payload Programmer programmer, @Headers Map<String, Object> map, @Header(name = "code") String code) {
 | 
			
		||||
        log.info("收到对象消息: {}", programmer);
 | 
			
		||||
        map.forEach((key, value) -> {
 | 
			
		||||
@@ -44,19 +47,35 @@ public class StreamReceived {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
    /*
 | 
			
		||||
     * 监听消息头key = 01 的消息
 | 
			
		||||
     */
 | 
			
		||||
    @StreamListener(target = Custom.INPUT, condition = "headers['key']=='01'")
 | 
			
		||||
    @StreamListener(target = CustomStream.INPUT, condition = "headers['key']=='01'")
 | 
			
		||||
    public void key01(@Payload Programmer programmer) {
 | 
			
		||||
        log.info("key01 监听器接收到消息: {}", programmer.getName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
    /*
 | 
			
		||||
     * 监听消息头key = 02 的消息
 | 
			
		||||
     */
 | 
			
		||||
    @StreamListener(target = Custom.INPUT, condition = "headers['key']=='01'")
 | 
			
		||||
    @StreamListener(target = CustomStream.INPUT, condition = "headers['key']=='01'")
 | 
			
		||||
    public void key02(@Payload Programmer programmer) {
 | 
			
		||||
        log.info("key02 监听器接收到消息: {}", programmer.getName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 消息转发
 | 
			
		||||
     */
 | 
			
		||||
    @StreamListener(target = CustomStream.INPUT)
 | 
			
		||||
    @SendTo(CustomStream.OUTPUT)
 | 
			
		||||
    public String forward(String payload){
 | 
			
		||||
        log.info("input forward: {}",payload);
 | 
			
		||||
        return "forward "+payload;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @StreamListener(target = CustomStream.OUTPUT)
 | 
			
		||||
    public void outSimpleListen(String payload){
 | 
			
		||||
        log.info("output 收到简单消息: {}", payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -3,4 +3,13 @@ spring:
 | 
			
		||||
    host: localhost
 | 
			
		||||
    port: 5672
 | 
			
		||||
    username: guest
 | 
			
		||||
    password: guest
 | 
			
		||||
    password: guest
 | 
			
		||||
  # 指定消息的分组 默认交换机的类型是topic,发送消息是广播的方式,
 | 
			
		||||
  # 所有客户端都会受到消息,指定分组后只有一个客户端可以接收到消息
 | 
			
		||||
  cloud:
 | 
			
		||||
    stream:
 | 
			
		||||
      bindings:
 | 
			
		||||
       # 和我们在接口中声明消息名称的保持一致
 | 
			
		||||
       customInput:
 | 
			
		||||
       # 分组名称任意填写即可
 | 
			
		||||
        group: gro
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user