【RabbitMQ-基础使用(Spring AMQP)】

时间:2022-06-03 00:48:13

目录:

一、RabbitMQ部署

二、认识RabbitMQ

三、RabbitMQ快速入门

四、Spring AMQP

五、各种消息模型实例

六、消息转换器


一、RabbitMQ部署

  • 1、在线拉取image
    命令:​​docker pull rabbitmq:3-management​
  • 2、安装RabbitMQ
    命令中对应的​​username​​和​​password​​修改为自己要设置的用户名及密码即可;
    ​-p 15672:15672​​中​​15672​​端口是Management Plugin管理插件的访问端口,如:​​127.0.0.1:15672​​,或者win访问虚拟机IP+端口:​​192.168.255.1:15672​​;
    ​-p 5672:5672​​中的​​5672​​端口是RabbitMQ容器的默认端口。


docker run \
-d \
-p 15672:15672 \
-p 5672:5672 \
--hostname my-rabbit \
--name mrmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management

【RabbitMQ-基础使用(Spring AMQP)】

这样,我们就已经完成RabbitMQ的部署,就可以开始使用了。


二、认识RabbitMQ

  • 1、RabbitMQ中的角色

【RabbitMQ-基础使用(Spring AMQP)】


  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

  • 2、RabbitMQ消息模型

Ⅰ 基本消息队列
① BasicQueue

【RabbitMQ-基础使用(Spring AMQP)】


Ⅱ 工作消息队列
② WorkQueue

【RabbitMQ-基础使用(Spring AMQP)】


Ⅲ 发布订阅模型
③ Fanout Exchange(广播)

【RabbitMQ-基础使用(Spring AMQP)】


④ Direct Exchange(路由)

【RabbitMQ-基础使用(Spring AMQP)】


⑤ Topic Exchange(主题)

【RabbitMQ-基础使用(Spring AMQP)】


MQ存在的好处就是,消息发送方和消息接收方之间可以实现异步通信,由中间的Broker帮助完成。


三、RabbitMQ快速入门

  • 1、消息发送方


// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.253.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("test");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
  • 2、消息接收方


// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.253.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("test");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});

此处的VirtualHost可以通过Management Plugin查询,具体如下:

【RabbitMQ-基础使用(Spring AMQP)】


四、Spring AMQP

此处,我们以基本消息队列​​BasicQueue​​为例

【RabbitMQ-基础使用(Spring AMQP)】


  • 1、Spring AMQP的功能
  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。
  • 2、基础依赖
    引入依赖:​​spring-boot-starter-amqp​


<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 3、发送消息

① 消息发送方配置application.yml


# rabbitmq
spring:
rabbitmq:
host: 192.168.253.128
port: 5672
virtual-host: /
username: test
password: 123456

② 发送消息


@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Hello, Spring AMQP!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}

此时,我们就可以通过管理插件查看已经发送的消息。如下:

【RabbitMQ-基础使用(Spring AMQP)】


  • 4、消费消息(接收消息)

① 消息接收方配置application.yml
此处配置与消费方一致。
② 接收消息


@Component
public class SpringRabbitMQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMsg(String msg){
System.out.println(msg);
}
}

注意:此处消息被消费后,对应的simple.queue​中的消息就消失了


五、各种消息模型实例

基本消息队列​​BasicQueue​​即为上方的代码,此处不再重复。

  • 1、WorkQueue

【RabbitMQ-基础使用(Spring AMQP)】


​WorkQueue​​与​​BasicQueue​​不同之处,就是​​WorkQueue​​支持一对多发布消息(不是一个消息发给多个消费者,一个消息只会被一个消费者消费),多个消费者可以提高消息消费速度,当然相同之处也是消息消费后就会从Queue中消失(后续的几种模型都是如此)。

① 模拟消息堆积


// 队列名称
String queueName = "simple.queue";
// 消息
String message = "Message_";
for (int i = 1; i <= 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}

② 接收消息
此处设置两个线程处理速度不同。


@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}

处理结果是2个消费者会均分消息。可以修改消费方的配置,以按照实际处理能力分配,如下:


spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
  • 2、Fanout

【RabbitMQ-基础使用(Spring AMQP)】


① 编写Fanout配置类
创建FanoutExchange,绑定队列Queue和交换机Exchange。


@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("stone.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

② 发送消息


// 队列名称
String exchangeName = "stone.fanout";
// 消息
String message = "Hello, Fanout!";
rabbitTemplate.convertAndSend(exchangeName, "", message);

③ 接收消息


@RabbitListener(queues = "fanout.queue1")
public void listen1FanoutQueueMsg(String msg){
System.out.println("Listener1 get :" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listen2FanoutQueueMsg(String msg){
System.out.println("Listener2 get :" + msg);
}

不同于​​WorkQueue​​,​​Fanout Exchange​​广播模型下,绑定该交换机的消费者可以获取到对应的消息(即一条消息可以通过交换机被多个消费者消费)。

  • 3、Direct

【RabbitMQ-基础使用(Spring AMQP)】


① 基于注解声明队列和交换机
@RabbitListener的使用
Ⅰ ​​​bindings = @QueueBinding()​​​配置绑定关系;
Ⅱ ​​​value = @Queue(name = "direct.queue1")​​​配置队列;
Ⅲ ​​​exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT)​​​配置交换机; Ⅳ ​​​key = {"talkshow", "musicshow"}​​​配置订阅。


注意:type = ExchangeTypes.DIRECT是默认类型,可以不做配置。


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "musicshow"}
))
public void listenDirectQueue1(String msg){
System.out.println("DirectQueue1 :" + msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "stone.direct", type = ExchangeTypes.DIRECT),
key = {"talkshow", "news"}
))
public void listenDirectQueue2(String msg){
System.out.println("DirectQueue2 :" + msg);
}

② 发送消息


// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String messageNews = "乌俄冲突升级,昔日友邦冷眼旁观!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "news", messageNews);
// 消息
String messageTalks = "蜘蛛侠3英雄无归发布蓝光预告,主演再登SN宣传!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "talkshow", messageTalks);

此时:订阅​​news​​主题的队列​​direct.queue1​​可以消费​​messageNews​​,订阅​​talkshow​​主题的​​direct.queue1​​和​​direct.queue2​​均可以消费​​messageTalks​​。

  • 4、Topic

【RabbitMQ-基础使用(Spring AMQP)】


​Topic​​类型的​​Exchange​​与​​Direct​​相比,都是可以根据​​RoutingKey​​把消息路由到不同的队列。只不过​​Topic​​类型​​Exchange​​可以让队列在绑定​​Routing key​​ 的时候使用通配符。通配符规则:
​#​​:匹配一个或多个词
​*​​:匹配不多不少恰好1个词
① 发送消息


/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "建设更高水平法治中国";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

② 接收消息


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"China.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("TopicQueue1 :" + msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "stone.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("TopicQueue2 :" + msg);
}

此时,由于消息Topic满足两个队列的订阅规则,所以两个队列都可以消费到消息。


六、消息转换器

  • 1、默认转换器

① 发送消息


// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jackson");
msg.put("age", 24);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue","", msg);

【RabbitMQ-基础使用(Spring AMQP)】


此时,我们通过管理平台可以看到传送的消息是序列化后的消息,是Java原生的序列化类型。
② 接收消息


@RabbitListener(queues = "object.queue")
public void listenObjectQueue(HashMap<String, Object> msg){
System.out.println("object.queue get msg is: " + msg);
}

接收到的消息也可以正常被反序列化:​​object.queue get msg is: {name=Jackson, age=24}​​。

  • 2、配置JSON转换器

注意:在修改原生转换器时,要同时修改消息发送方和接收方的转换器,不然会报错。
① 基础依赖
在消息发送方和接收方,添加JSON转换器的依赖​​jackson-dataformat-xml​​。


<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

② 注册JSON转换器Bean
在消息发送方和接收方,注册JSON转换器到Spring容器中。


@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

【RabbitMQ-基础使用(Spring AMQP)】


此时,我们再发送消息,管理平台显示的就是JSON类型。


七、结尾

以上即为RabbitMQ的基础内容