RabbitMQ发布与订阅模式类型

时间:2022-12-28 07:55:26

????博客主页:????不会压弯的小飞侠
✨欢迎关注:????点赞????收藏⭐留言✒
✨系列专栏:????Linux专栏
????欢迎大佬指正,一起学习!一起加油!

RabbitMQ发布与订阅模式类型


????模式说明

  • 工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中,我们将做一些事情 完全不同的 - 我们将向多个传递消息 消费者。此模式称为“发布/订阅”。
    RabbitMQ发布与订阅模式类型

  • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

????发布与订阅模式完成消息传递

  • 编写生产者发送消息
    • 编写消息生产者 Producter
public class Producer {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "此消息被交换机发到两个队列中!!!";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    }
}

  • 测试发布者

RabbitMQ发布与订阅模式类型

  • 编写消费者接收消息
    • 编写消息消费者Consumer1
public class Consumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue1Name = "test_fanout_queue1";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
    }
}
  • 编写消费者接收消息
    • 编写消息消费者Consumer2
public class Consumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue2Name = "test_fanout_queue2";
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume(queue2Name,true,consumer);
    }
}
  • 测试
  • 启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
    RabbitMQ发布与订阅模式类型
    RabbitMQ发布与订阅模式类型

????总结

  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机