RabbitMQ

时间:2022-12-18 16:54:48

异步通信

异步通信的优点

  • 耦合度低
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

异步通信的缺点

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

RabbitMQ安装

RabbitMQ

RabbitMQ中的几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息的队列
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

默认15672端口是

RabbitMQ模型

1.基本消息队列


public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.239.131");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.239.131");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()5.利用channel将消费者与队列绑定

SpringAMQP

1.什么是SpringAMQP

  1. 什么是AMQP?
    应用间消息通信的一种协议,与语言和平台无关。

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

  2. springAMQP如何发送消息?

    1. 引入amqp的starter依赖
    2. 配置RabbitMQ地址
    3. 利用RabbitTemplate的convertAndSend方法

2.用SpringAMQP简化基本消息队列

在父工程中引入spring-amqp的依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

1.springAMQP发送消息

1. 配置RabbitMQ地址
spring:
 rabbitmq:
   host: 192.168.239.131
   port: 5672
   username: itcast
   password: 123321
   virtual-host: /
2. 利用RabbitTemplate的convertAndSend方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend2SimpleQueue(){
        String queueName = "simple.queue";
        String message = "hello!,SpringAMQP!";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列。

2.springAMQP接收消息

在consumer服务中编写消费逻辑,绑定simple.queue这个队列

1. 配置RabbitMQ地址
2.写一个监听类监听消息队列
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到了【" + msg + "】");
    }
}

3.工作队列

生产者生产50个消息:

    @Test
    public void testSend2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello!,message__!";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message+i);
            Thread.sleep(20);
        }
    }

模拟两个消费能力不同的消费者:一个1s消费50个,一个1s消费10个。让这两个消费者绑定到同一个队列:

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者接收到了[" + msg + "]"+LocalDateTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者接收到了[" + msg + "]"+ LocalDateTime.now());
        Thread.sleep(200);
    }

work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

4.发布订阅队列模型-fanoutExchange

1.在consumer服务声明Exchange、Queue、Binding

@Configuration
public class FanoutConfig {
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("kxy.fanoutExchange");
    }
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("kxy.fanoutQueue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("kxy.fanoutQueue2");
    }
    @Bean
    public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }   
    @Bean
    public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

2.在consumer服务声明两个消费者

@Component
public class RabbitMQListener {
    @RabbitListener(queues = "kxy.fanoutQueue1")
    public void listenFanoutQueue1(String msg) {
        System.err.println("消费者1接收到了[" + msg + "]");
    }
    @RabbitListener(queues = "kxy.fanoutQueue2")
    public void listenFanoutQueue2(String msg) {
        System.err.println("消费者2接收到了[" + msg + "]");
    }
}

3.在publisher服务发送消息到FanoutExchange

    @Test
    public void testSend2fanoutQueue(){
        String exchangeName = "kxy.fanoutExchange";
        String message = "hello!,everyone!";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

总结:
交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列·不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

5.发布订阅队列模型–DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式( routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

RabbitMQ
1.在消费者服务里声明两个队列,以及队列的key

@Component
public class RabbitMQListener {
	@RabbitListener(bindings = @QueueBinding(
            value = @Queue("kxy.directQueue1"),
            exchange = @Exchange(name = "kxy.directExchange",type = ExchangeTypes.DIRECT),
            key = {"blue","red"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到了[" + msg + "]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("kxy.directQueue2"),
            exchange = @Exchange(name = "kxy.directExchange",type = ExchangeTypes.DIRECT),
            key = {"yellow","red"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到了[" + msg + "]");
    }
}

2.生产者发布消息到交换机,并指定相应的key,到不同的路由

    public void testSend2DirectQueue(){
        String exchangeName = "kxy.directExchange";
        String message = "hello!,blue!";
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

  • @Queue
  • @Exchange

6.发布订阅队列模型-TopicExchange

#代表多个或0个
*代表恰好一个

1.在消费者服务声明TopicExchange,以及绑定的两个队列,队列的key

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("kxy.topicQueue1"),
            exchange = @Exchange(name = "kxy.topicExchange",type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到了[" + msg + "]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("kxy.topicQueue2"),
            exchange = @Exchange(name = "kxy.topicExchange",type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到了[" + msg + "]");
    }

2.生产者服务发布消息,可以通过通配符来决定发送给哪个消费者:

    @Test
    public void testSend2TopicQueue(){
        String exchangeName = "kxy.topicExchange";
        String message = "哈哈哈哈哈哈";
        rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
    }

7.消息转换器

需要用到Jackson来做json的序列化:

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方与接收方必须使用相同的MessageConverter