RabbitMQ备份交换机与优先级队列

时间:2024-03-14 22:02:34

1. 备份交换机

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
在这里插入图片描述

2.修改配置类

    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";

    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup.queue";

    //告警队列
    public static final String WARNING_QUEUE_NAME = "warning.queue";
    
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    // 声明确认队列
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding backupQueueBindingBackupQueue(@Qualifier("backupQueue") Queue backupQueue,
                                @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding warningQueueBindingBackupQueue(@Qualifier("warningQueue") Queue warningQueue,
                                                 @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

3. 告警消费者

package com.hong.springboot.rabbitmq.consumer;

import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description: 告警消费者
 * @Author: hong
 * @Date: 2024-03-10 17:54
 * @Version: 1.0
 **/
@Slf4j
@Component
public class WarningConsumer {
    //接收告警消息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMessage(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},备用交换机收到不可路由信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);
    }
}

先删了confirm.exchange交换机(改了配置)再启动项目,发送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才
在这里插入图片描述

4. 优先级队列

4.1. 方式1-控制台添加

在这里插入图片描述

4.2. 方式2-代码实现

package com.hong.rabbitmq10;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description: 优先级队列生产者
 * @Author: hong
 * @Date: 2024-03-11 21:11
 * @Version: 1.0
 **/
public class PriorityProducer {
    private static final String QUEUE_NAME = "priority-queue";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.4");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 声明消息队列
         * 第一个参数: 队列名称
         * 第二个参数: 是否持久化     false不持久化,rabbitmq重启队列丢失  true持久化,rabbitmq重启队列不丢失
         * 第三个参数: 是否排外       false排外只能一个消费者消费         false不排外可多消费者消费
         * 第四个参数: 是否自动删除   true自动删除最后一个消费者断开连接后该队列自动删除   false不自动删除
         * 第五个参数: 其他参数信息
         */
        Map<String,Object> map = new HashMap<>();
        //设置最大优先级
        map.put("x-max-priority",10);
        channel.queueDeclare(QUEUE_NAME,true,false,false,map);

        for (int i = 1; i < 11; i++) {
            String message = "HELLO RABBITMQ!-------"+i;
            if(i == 5){
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
                channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
            }else{
                /**
                 * 发布消息
                 * 第一个参数:要将消息发送到哪个交换机
                 * 第二个参数:路由的 key 是哪个
                 * 第三个参数:其他参数信息
                 * 第四个参数:消息的消息体
                 */
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            }
        }
        System.out.println("消息发送完成!");
    }
}

在这里插入图片描述
消费者

package com.hong.rabbitmq10;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Description: 优先级队列消费者
 * @Author: hong
 * @Date: 2024-03-11 21:14
 * @Version: 1.0
 **/
public class PriorityConsumer {
    private static final String QUEUE_NAME = "priority-queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.4");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (comsumerTag, message) -> System.out.println(new String(message.getBody()));
        CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
        /**
         * 消息消费
         * 第一个参数: 消费哪个队列
         * 第二个参数: 是否自动确认消息
         * 第三个参数: 当一个消息发送过来后的回调接口
         * 第四个参数: 当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

在这里插入图片描述