设置发布和订阅消息的 RabbitMQ AMQP 服务器

时间:2022-12-22 11:23:50

设置发布和订阅消息的 RabbitMQ AMQP 服务器

本指南将引导您完成设置发布和订阅消息的 RabbitMQ AMQP 服务器的过程,并创建一个 Spring 引导应用程序以与该 RabbitMQ 服务器进行交互。

您将构建什么

您将构建一个应用程序,该应用程序使用 Spring AMQP 发布消息,并使用 在 POJO 上订阅消息。​​RabbitTemplate​​​​MessageListenerAdapter​

你需要什么

  • 约15分钟
  • 最喜欢的文本编辑器或 IDE
  • JDK 11或以后
  • 格拉德尔 4+​或梅文 3.2+
  • 您也可以将代码直接导入到 IDE 中:
  • 弹簧工具套件 (STS)
  • 智能理念
  • VSCode
  • 设置 RabbitMQ 服务器。看设置 RabbitMQ 代理.

如何完成本指南

像大多数春天一样入门指南,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,您最终都会得到工作代码。

要从头开始,请继续设置 RabbitMQ 代理.

要跳过基础知识,请执行以下操作:

  • 下载​并解压缩本指南的源存储库,或使用吉特:git clone https://github.com/spring-guides/gs-messaging-rabbitmq.git
  • 光盘成gs-messaging-rabbitmq/initial
  • 跳转到从 Spring 初始化开始.

完成后,您可以根据 中的代码检查结果。​​gs-messaging-rabbitmq/complete​

设置 RabbitMQ 代理

在构建消息传递应用程序之前,需要设置服务器来处理消息的接收和发送。

RabbitMQ 是一个 AMQP 服务器。服务器可在以下网址免费获得https://www.rabbitmq.com/download.html.您可以手动下载它,或者,如果您使用带有自制软件的 Mac,则通过在终端窗口中运行以下命令:

brew install rabbitmq

解压缩服务器并通过在终端窗口中运行以下命令以默认设置启动它:

rabbitmq-server

应会看到类似于以下内容的输出:

RabbitMQ 3.1.3. Copyright (C) 2007-2013 VMware, Inc.
## ## Licensed under the MPL. See https://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker... completed with 6 plugins.

您也可以使用码头工人撰写以快速启动 RabbitMQ 服务器(如果您在本地运行 Docker)。在Github的项目根目录中有一个。这非常简单,如以下列表所示:​​docker-compose.yml​​​​complete​

rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"

将此文件放在当前目录中,您可以运行以使 RabbitMQ 在容器中运行。​​docker-compose up​

从 Spring 初始化开始

你可以使用这个预初始化项目,然后单击生成以下载 ZIP 文件。此项目配置为适合本教程中的示例。

手动初始化项目:

  1. 导航到https://start.spring.io.此服务拉入应用程序所需的所有依赖项,并为您完成大部分设置。
  2. 选择 Gradle 或 Maven 以及您要使用的语言。本指南假定您选择了 Java。
  3. 单击依赖关系,然后为 RabbitMQ 选择 Spring
  4. 单击生成
  5. 下载生成的 ZIP 文件,该文件是配置了您选择的 Web 应用程序的存档。

如果您的 IDE 集成了 Spring Initializr,则可以从 IDE 完成此过程。

您也可以从 Github 分叉项目,然后在 IDE 或其他编辑器中打开它。

创建 RabbitMQ 消息接收器

对于任何基于消息传递的应用程序,都需要创建一个响应已发布消息的接收器。以下清单(来自 )显示了如何执行此操作:​​src/main/java/com.example.messagingrabbitmq/Receiver.java​

package com.example.messagingrabbitmq;

import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

private CountDownLatch latch = new CountDownLatch(1);

public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}

public CountDownLatch getLatch() {
return latch;
}

}

这是一个 POJO,用于定义接收消息的方法。当您注册它以接收消息时,您可以将其命名为所需的任何名称。​​Receiver​

为方便起见,此 POJO 还具有 .这让它发出信号,表明消息已收到。这是您不太可能在生产应用程序中实现的。​​CountDownLatch​

注册侦听器并发送消息

Spring AMQP 提供了使用 RabbitMQ 发送和接收消息所需的一切。但是,您需要:​​RabbitTemplate​

  • 配置消息侦听器容器。
  • 声明队列、交换以及它们之间的绑定。
  • 配置组件以发送一些消息来测试侦听器。

Spring Boot 会自动创建一个连接工厂和一个 RabbitTemplate,从而减少您必须编写的代码量。

您将用于发送消息,并且您将向消息侦听器容器注册以接收消息。连接工厂驱动两者,让它们连接到 RabbitMQ 服务器。下面的清单(来自)显示了如何创建应用程序类:​​RabbitTemplate​​​​Receiver​​​​src/main/java/com.example.messagingrabbitmq/MessagingRabbitApplication.java​

package com.example.messagingrabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

static final String topicExchangeName = "spring-boot-exchange";

static final String queueName = "spring-boot";

@Bean
Queue queue() {
return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
}

}

​@SpringBootApplication​​是一个方便的注释,它添加了以下所有内容:

  • ​@Configuration​​:将类标记为应用程序上下文的 Bean 定义源。
  • ​@EnableAutoConfiguration​​:告诉 Spring 引导根据类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果 在类路径上,则此注释会将应用程序标记为 Web 应用程序并激活关键行为,例如设置 .spring-webmvcDispatcherServlet
  • ​@ComponentScan​​:告诉 Spring 在包中查找其他组件、配置和服务,让它找到控制器。com/example

该方法使用 Spring Boot 的方法启动应用程序。您是否注意到没有一行 XML?也没有文件。此 Web 应用程序是 100% 纯 Java,您无需处理配置任何管道或基础结构。​​main()​​​​SpringApplication.run()​​​​web.xml​

方法中定义的 Bean 在容器中注册为消息侦听器(在 中定义)。它侦听队列中的消息。由于该类是 POJO,因此需要将其包装在 中,您可以在其中指定它调用 。​​listenerAdapter()​​​​container()​​​​spring-boot​​​​Receiver​​​​MessageListenerAdapter​​​​receiveMessage​

JMS 队列和 AMQP 队列具有不同的语义。例如,JMS 仅将排队的消息发送给一个使用者。虽然 AMQP 队列执行相同的操作,但 AMQP 生产者不会将消息直接发送到队列。相反,消息被发送到交易所,该交易所可以转到单个队列或扇出到多个队列,从而模拟 JMS 主题的概念。

消息侦听器容器和接收方 bean 是侦听消息所需的全部内容。要发送消息,您还需要一个兔子模板。

该方法创建一个 AMQP 队列。该方法创建主题交换。该方法将这两者绑定在一起,定义发布到交易所时发生的行为。​​queue()​​​​exchange()​​​​binding()​​​​RabbitTemplate​

Spring AMQP 要求将 、 和 声明为* Spring bean,以便正确设置。​​Queue​​​​TopicExchange​​​​Binding​

在这种情况下,我们使用主题交换,队列绑定路由密钥 ,这意味着使用以 开头的路由密钥发送的任何消息都将路由到队列。​​foo.bar.#​​​​foo.bar.​

发送测试消息

在此示例中,测试消息由 发送,该消息还等待接收器中的锁存器并关闭应用程序上下文。以下清单(来自 )显示了它的工作原理:​​CommandLineRunner​​​​src/main/java/com.example.messagingrabbitmq/Runner.java​

package com.example.messagingrabbitmq;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;

public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}

}

请注意,模板使用与绑定匹配的路由密钥将消息路由到交换。​​foo.bar.baz​

在测试中,您可以模拟运行器,以便可以单独测试接收器。

运行应用程序

该方法通过创建 Spring 应用程序上下文来启动该过程。这将启动消息侦听器容器,该容器开始侦听消息。有一个 bean,然后它会自动运行。它从应用程序上下文中检索 并在队列上发送消息。最后,它关闭 Spring 应用程序上下文,应用程序结束。​​main()​​​​Runner​​​​RabbitTemplate​​​​Hello from RabbitMQ!​​​​spring-boot​

构建可执行的 JAR

您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必需依赖项、类和资源的可执行 JAR 文件并运行该文件。通过构建可执行 jar,可以轻松地在整个开发生命周期中跨不同环境等将服务作为应用程序进行交付、版本控制和部署。

如果使用 Gradle,则可以使用 .或者,您可以使用 JAR 文件生成 JAR 文件,然后运行该文件,如下所示:​​./gradlew bootRun​​​​./gradlew build​

java -jar build/libs/gs-messaging-rabbitmq-0.1.0.jar

如果使用 Maven,则可以使用 运行应用程序。或者,您可以使用 JAR 文件生成 JAR 文件,然后运行该文件,如下所示:​​./mvnw spring-boot:run​​​​./mvnw clean package​

java -jar target/gs-messaging-rabbitmq-0.1.0.jar

此处描述的步骤将创建一个可运行的 JAR。你也可以构建经典 WAR 文件.

应会看到以下输出:

Sending message...
Received <Hello from RabbitMQ!>

总结

祝贺!您刚刚使用 Spring 和 RabbitMQ 开发了一个简单的发布和订阅应用程序。您可以通过以下方式做更多的事情春天和兔子MQ比这里涵盖的内容,但本指南应该提供一个良好的开端。