rabbitmq 3.6 延时消息

时间:2023-03-09 09:12:56
rabbitmq 3.6 延时消息
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#安装插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

生产端

rabbitTemplate.convertAndSend("direct.exchange", "notify", msg, new MessagePostProcessor() {

            @Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(20000);
System.out.println(sdf.format(new Date()) + " Delay sent.");
return message;
}
});

消费端

@Component("delayedReceiver")
@EnableRabbit
@Configuration
public class DelayedReceiver { private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @RabbitListener(queues = "notify.queue")
public void handleMessageNotify(Object object, Channel channel) throws Exception {
Message msg = (Message) object;
System.out.println(sdf.format(new Date()) + " notify :" + new String(msg.getBody())); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} @Bean
public Exchange direct() {
return ExchangeBuilder.directExchange("direct.exchange").delayed().withArgument("x-delayed-type", "direct")
.build();
} @Bean
public Queue notifyQueue() {
return new Queue("notify.queue");
} @Bean
public Binding bindingNotify(DirectExchange direct, Queue notifyQueue) {
return BindingBuilder.bind(notifyQueue).to(direct).with("notify");
} }