java框架之SpringBoot(12)-消息及整合RabbitMQ

时间:2023-12-09 14:05:37

前言

概述

大多数应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦的能力。

消息服务中两个重要概念:消息代理(message broker)和目的地(destination)。当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地。

消息队列主要有两种形式的目的地:

  • 队列(queue):点对点消息通信(point-to-point)。
  • 主题(topic):发布(publish)/订阅(subscribe)消息通信。

点对点:

  • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。
  • 消息只有唯一的发送者和接收者,但并不是只能有一个接收者。

发布/订阅:

  • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时接收到消息。

两种规范

JMS(Java Message Service):

  • Java 消息服务,基于 JVM 消息代理的规范。ActiveMQ、HornetMQ 是 JMS 的实现。

AMQP(Advanced Message Queuing Protocol):

  • 高级消息队列协议,也是一个消息代理的规范,兼容 JMS。
  • RabbitMQ 是 AMQP 的实现。
JMS 与 AMQP 对比:
  JMS AMQP
定义 Java API 网络级协议
跨语言
跨平台
Model 提供 2 种消息模型:
  1. Peer-2-Peer
  2. Pub/Sub
提供了 5 种消息模型:
  1. direct exchange
  2. fanout exchange
  3. topic exchange
  4. headers exchange
  5. system exchange

本质来讲,后四种和 JMS 的 Pub/Sub 模型没有太大区别,仅是在路由机制上做了更详细的区分。

支持消息类型 多种消息类型:
  1. TextMessage
  2. MapMessage
  3. BytesMessage
  4. StreamMessage
  5. ObjectMessage
  6. Message(只有消息头和属性)
因其要支持跨语言跨平台,所以仅支持 byte[],当实际应用中有复杂的消息时,可以将消息序列化后发送。
综合 HMS 定义了 Java API 层面的标准,在 Java 体系中,多个 client 均可通过 JMS 进行交互,不需要修改应用代码,但是其对跨平台支持较差。 AMQP 定义了 wire-level 层的协议标准,天然具有跨平台、跨语言特性。

Spring 支持:

  • spring-jms 提供了对 JMS 的支持。
  • spring-rabbit 提供了对 AMQP 的支持。
  • 需使用 ConnectionFactory 的实现来连接消息代理。
  • 提供 JmsTemplate、RabbitTemplate 来操作消息。
  • @JmsListener(JMS)和 @RabbitListener(AMQP)注解标注在方法上可监听消息代理发布的消息。
  • @EnableJms、@EnableRabbit 开启支持。

SpringBoot 自动配置类:

  • JMS 的自动配置类为 JmsAutoConfiguration。
  • AMQP 的自动配置类为 RabbitAutoConfiguration。

几种场景

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:串行方式、并行方式。

1、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

java框架之SpringBoot(12)-消息及整合RabbitMQ

2、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

java框架之SpringBoot(12)-消息及整合RabbitMQ

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是 150 毫秒,并行的时间可能是 100 毫秒。

因为 CPU 在单位时间内处理的请求数是一定的,假设 CPU1 秒内吞吐量是 100 次。则串行方式 1 秒内 CPU 可处理的请求量是 7 次(1000/150)。并行方式处理的请求量是 10 次(1000/100)。

如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)很容易达到瓶颈。

3、引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

java框架之SpringBoot(12)-消息及整合RabbitMQ

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒 20 QPS。比串行提高了 3 倍,比并行提高了 2 倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式:

java框架之SpringBoot(12)-消息及整合RabbitMQ

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。
  • 订单系统与库存系统耦合。

引入消息队列:

java框架之SpringBoot(12)-消息及整合RabbitMQ

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量削锋

场景说明:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

java框架之SpringBoot(12)-消息及整合RabbitMQ

  • 可以控制活动的人数。

  • 可以缓解短时间内高流量压垮应用。

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

  • 秒杀业务根据消息队列中的请求信息,再做后续处理。

该部分内容摘自 https://blog.csdn.net/cws1214/article/details/52922267

RabbitMQ介绍

简介

RabbitMQ 采用 Erlang 语言开发,是 AMQP 的开源实现。Erlang 语言由 Ericson 设计,专门为开发 concurrent 和 distribution 系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为 Erlang 语言的一部分,包含了很多基于 Erlang 开发的中间件/库/工具,如 mnesia/SASL,极大方便了 Erlang 应用的开发。OTP 就类似于 Python 语言中众多的 module,用户借助这些 module 可以很方便的开发应用。

核心概念

java框架之SpringBoot(12)-消息及整合RabbitMQ

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先级)、delivery-mode(标识指定消息是否需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换机发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由到服务器中的队列,也是消息到达 Broker 的第一站,根据分发规则,匹配查询表中的路由键,分发消息到队列中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) 和 fanout (multicast)。

Queue

消息队列,用来保存信息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入到一个或多个队列。消息一致在队列中,等待消费者连接到这个队列将其取走。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。路由器和队列的绑定可以是多对多的关系。

Connection

连接,Publisher/Consumer 和 Broker 之间的 TCP 连接。断开连接的操作只会在 client  端进行,Broker 不会断开连接,除非出现网络故障或 Broker 服务出现问题。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接受消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 '/'。

Broker

表示消息队列服务器实体,即接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。

运行机制

AMQP中的消息路由

AMQP 中的消息路由与 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者需要把消息发布到 Exchange,最终由 Exchange 转发到队列并被消费者接收,而 Binding 就决定了交换器会将消息转发到哪个队列。

java框架之SpringBoot(12)-消息及整合RabbitMQ

Exchange类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共有四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三中类型:

  • Direct Exchange

    消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发送到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为 "dog" 的消息,不会转发 "dog.puppy",也不会转发"dog.guard"等。它是完全匹配、单播的模式。

    java框架之SpringBoot(12)-消息及整合RabbitMQ

  • Fanout Exchange:每个发到 fanout 类型交换器的消息都会分发到所有绑定的队列中。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息时最快的。

    java框架之SpringBoot(12)-消息及整合RabbitMQ

  • Topic Exchange:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将由路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:"#"和"*"。"#" 匹配 0 个或多个单词,"*" 匹配一个单词。简单说就是根据 routing key 及通配规则将消息分发到目标队列中。

安装

参考【Docker 安装RabbitMQ】。

使用

准备

1、进入 RabbitMQ 的 web 可视化页,用 guest 用户登录,密码也为 guest。

2、新建如下测试队列:

java框架之SpringBoot(12)-消息及整合RabbitMQ

3、新建如下测试交换器:

java框架之SpringBoot(12)-消息及整合RabbitMQ

4、给新建的 direct 和 fanout 交换器新建如下绑定:

java框架之SpringBoot(12)-消息及整合RabbitMQ

5、给新建的 topic 交换器新建如下绑定:

java框架之SpringBoot(12)-消息及整合RabbitMQ

direct交换机测试

1、给“张三.msg”这个队列发送消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

2、“张三.msg”接收消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

fanout交换机测试

1、给所有绑定的队列发送消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

2、所有队列都接收到消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

topic交换器测试

1、给所有“姓张”的队列发送消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

2、所有“姓张”的队列都接收到消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

整合RabbitMQ

准备

1、使用 maven 新建 SpringBoot 项目,引入 Rabbit 、Web 场景启动器。

2、配置 RabbitMQ 连接信息:

spring.rabbitmq.host=192.168.202.136
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

application.properties

3、注解配置启用 RabbitMQ:

package com.springboot.config;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit // 启用 Rabbit
public class MyAmqpConfig {

}

com.springboot.config.MyAmqpConfig

4、新建测试 JavaBean:

package com.springboot.bean;

import java.io.Serializable;
import java.util.Date;

public class User implements Serializable {

    private Integer id;
    private String name;
    private Date birthday;
    private String city;

    public User() {
    }

    public User(Integer id, String name, Date birthday, String city) {
        this.id = id;
        this.name = name;
        this.birthday = birthday;
        this.city = city;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getBirthday() {
        return birthday;
    }

    public void setBirthday(Date birthday) {
        this.birthday = birthday;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", birthday=" + birthday +
                ", city='" + city + '\'' +
                '}';
    }
}

com.springboot.bean.User

RabbitTemplate使用

下面通过 RabbitTemplate 来完成上述 RabbitMQ 在可视化界面中的几个测试操作:

package com.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    // org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 自动配置类中注册了 RabbitTemplate 的 bean
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        // 通过 direct 交换器给 “张三.msg” 队列发送消息

        // send 方法的 message 参数中需要自己定义消息头和消息体
        // rabbitTemplate.send(exchange,routingkey,message);

        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg","你好 张三");
    }

    @Test
    public void test2(){
        // 接收 “张三.msg” 队列的消息
        Object o = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(o.toString());

        /*
        你好 张三
         */
    }

    @Test
    public void test3(){
        // 通过 fanout 交换器给所有队列发送消息

        rabbitTemplate.convertAndSend("my.fanout", "zhangsan.msg", "大家好");
    }

    @Test
    public void test4(){
        // 所有队列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(msg1.toString());
        Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg");
        System.out.println(msg2.toString());
        Object msg3 = rabbitTemplate.receiveAndConvert("李三.msg");
        System.out.println(msg3.toString());
        Object msg4 = rabbitTemplate.receiveAndConvert("李四.msg");
        System.out.println(msg4.toString());

        /*
        大家好
        大家好
        大家好
        大家好
         */
    }

    @Test
    public void test5(){
        // 通过 topic 交换器给所有“姓张”的队列发送消息
        rabbitTemplate.convertAndSend("my.topic", "zhang.hello", "张先生 你好");
    }

    @Test
    public void test6(){
        // 所有“姓张”的队列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg");
        Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg");

        System.out.println(msg1);
        System.out.println(msg2);

        /*
        张先生 你好
        张先生 你好
         */
    }
}

test

在上述的操作中操作的都是字符串,而通过 RabbitTemplate 是可以直接操作对象的,RabbitTemplate 内部的 Converter 会自动帮我们完成对象的序列化与反序列化:

package com.springboot;

import com.springboot.bean.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.ParseException;
import java.text.SimpleDateFormat;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1() throws ParseException {
        // 直接发送一个对象
        User user = new User(1, "张三", new SimpleDateFormat("yyyy-MM-dd").parse("1998-6-5"), "深圳");
        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg",user);
     java框架之SpringBoot(12)-消息及整合RabbitMQ
    }

    @Test
    public void test2(){
        Object o = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(o.getClass());
        System.out.println(o);

        /*
        class com.springboot.bean.User
        User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}
         */

        // 根据输出结果可以看到,获取的消息自动完成了反序列化转换为 java 对象
    }
}

test

查看 RabbitMQ 服务器中存储的对象,会发现存储的值为 RabbitMQ 以默认消息转换器 org.springframework.amqp.support.converter.SimpleMessageConverter 序列化后的值,如果我们需要存储的消息为 Json 格式,只需要自己注册一个 Json 格式消息转换器到容器即可,而 Spring 已经给我们提供了这个转换器:

package com.springboot.config;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class MyAmqpConfig {
    @Bean
    public MessageConverter messageConverter(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }
}

com.springboot.config.MyAmqpConfig

此时再次执行上述操作,查看服务器中存储消息:

java框架之SpringBoot(12)-消息及整合RabbitMQ

消息以转换为 Json 格式。

监听队列-@RabbitListener

Spring 也为我们提供了监听队列支持的注解 @RabbitListener,它能够帮我们很简便的创建一个监听服务,只需要标注在一个存放在 IoC 容器中实例的方法上。看如下示例:

1、创建一个服务类,注册到 IoC 容器,使用 @RabbitListener 注解标注在方法上:

package com.springboot.service;

import com.springboot.bean.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    @RabbitListener(queues = {"张三.msg"})  // 监听指定队列消息
    public void receiveUserMsg(User user) {
        // 接收自动反序列化后的对象
        System.out.println(user);
    }

    @RabbitListener(queues = {"李四.msg"})
    public void receiveMessage(Message message){
        // 接收源消息信息

        // 获得消息体
        System.out.println(message.getBody());
        // 获得消息属性信息
        System.out.println(message.getMessageProperties());
    }
}

com.springboot.service.UserService

2、启动程序,运行单元测试中发送 User 对象方法,监听程序输出如下:

User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}

AmqpAdmin组件

Spring 自动注册了一个 AmqpAdmin 组件,它的作用类似于数据库中的 DDL 语句,可以用来帮我们定义(创建)交换器、队列。如下:

package com.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpAdminTests {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void testDeclareExchange(){
        // 创建一个交换器
        Exchange exchange = new DirectExchange("my.directNew");
        amqpAdmin.declareExchange(exchange);

java框架之SpringBoot(12)-消息及整合RabbitMQ

    }

    @Test
    public void testDeclareQueue(){
        // 创建 Queue
        Queue queue = new Queue("myQueue");
        amqpAdmin.declareQueue(queue);

java框架之SpringBoot(12)-消息及整合RabbitMQ

    }

    @Test
    public void testBinding(){
        // 创建一个 binding ,绑定交换器与队列
        amqpAdmin.declareBinding(new Binding("myQueue", Binding.DestinationType.QUEUE,"my.directNew","myQueue",null));

java框架之SpringBoot(12)-消息及整合RabbitMQ

    }
}

test