RabbitMQ——死信队列

时间:2024-03-16 21:29:06
package com.weipch.rabbitmq.dlq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.weipch.util.RabbitMqUtils; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Author 方唐镜 * @Create 2024-03-03 13:50 * @Description */ public class Consumer01 { private static final String NORMAL_EXCHANGE = "normal_exchange"; private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String NORMAL_QUEUE = "normal_queue"; private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信交换机和队列 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key"); //声明普通交换机和队列 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //在正常队列中设置死信参数 指定死信交换机和死信路由键 Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "dead-routing-key"); //最大长度 //map.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, map); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); if (message.contains("5")){ System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝"); //拒绝消息并把消息丢入死信队列 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); }else { System.out.println("Consumer01接收消息:" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {}); } }