spring cloud stream 中使用rabbitMQ实现延时队列

时间:2024-04-02 08:10:00

1、首先需要部署环境,这里我把生产者和消费者放在同一个spring boot项目中

2、使用Spring Initializr 新建一个项目,选择spring web和cloud steam

spring cloud stream 中使用rabbitMQ实现延时队列生成项目并导入到idea中。

3、在pom.xml中,需要添加依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

4、在配置文件中配置spring cloud steam相关配置信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

  cloud:
    stream:
      bindings:
        greetingChannel:
          destination : greetings
        greetingChannel_input:
          destination: greetings
          group: group
      rabbit:
        bindings:
          greetingChannel:
            producer:
              delayed-exchange: true #延迟队列

这里需要保证本地安装有rabbitMQ,并且安装有对应的插件rabbitmq-delayed-message-exchange 自行百度实现即可

5、编写接口,在接口中定义消息产生和消息消费的两个方法

public interface HelloBinding {

        @Output("greetingChannel")
        MessageChannel greeting();

        String GREETING = "greetingChannel_input";
        @Input(GREETING)
        MessageChannel greetingInput();
}
这里的output和input中的字符串必须和配置文件中的两个字符串对应(已标红)

6、新建生产者类,绑定刚刚写好的接口类(红色代码)。具体代码如下

@EnableBinding(HelloBinding.class)
@RestController
public class ProducerControl {


    @Autowired
    private HelloBinding binding;

    @GetMapping("/greet/{name}")
    public void publish(@PathVariable String name) {
        MessageChannel greet = binding.greeting();
        String greeting = "Hello, " + name + "!";
        System.out.println("发送前的时间:" + LocalDateTime.now());
        Message<String> msg = MessageBuilder.withPayload(greeting).setHeader("x-delay", 1000 * 60 * 1)
                .build();
        boolean send = greet.send(msg);
        System.out.println("消息发送情况" + send + LocalDateTime.now());
    }
}

在以上代码中 ,定义了一个RestController,然后给方法定义了GetMapping地址,方便测试,接收一个name参数。当接口被调用的时候,传入name值,拼接字符串,然后使用MessageBuilder新建一个消息,用HelloBinding接口提供的方法获取输出通道(使用OutPut注解的方法),然后发送数据出去,这里实现延时效果的主要代码就是setHeader("x-delay", 1000 * 60 * 1),第一个参数不能修改,第二个参数可以设置延时时间,单位为ms,我这里设置了1分钟;另外,我们还需要在配置文件中给生产者设置启用延时,参考配置文件蓝色代码块。

7、到这里,生产者已经搞定,一旦接口被调用,则会产生一个延时1分钟的消息,一分钟后消息会到达rabbitMQ的queen中,这里有一个坑,如果我们设置通道的时候,没有给通道分组(即设置group,绿色部分代码),则发送消息时仍然会有通道,但是一旦停止项目,则这个通道会随之消失,如果在延时消息时间到达之前,项目仍然没有启动,则queen一直都不存在,那么延时消息就没有通道可以走,这个时候,消息就丢失了。所以说需要给channel设置group值,进行分组,分组还有一些其他的好处,大家自行百度即可

8、创建消息消费者,这个类比较简单,我们直接贴代码即可

@EnableBinding(HelloBinding.class)
public class HelloListener {

    @StreamListener(target = HelloBinding.GREETING)
    public void processHelloChannelGreeting(String msg) {
        System.out.println(LocalDateTime.now()+"收到的消息为:"+msg);
    }
}

这里值得注意的就是需要和HelloBinding绑定,然后在方法中设置监听的地址必须和HelloBinding接口的Input注解标注的方法的字符串一致,否则监听的就不是同一个通道了;最后会接收到字符串类型的数据,如果需要传递其他类型对象,将对象序列化为json字符串即可

9、运行当前项目,然后访问http://localhost:8080/greet/john即可产生一个延时消息,一分钟后会进入queen通道,然后被消费者取出并消费

参考链接:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/ (英文)

https://juejin.im/post/5d2201346fb9a07ee4638512 (中文)