ActiveMQ消息中间件

时间:2023-01-25 18:52:10

最近学习到ActiveMQ,之前也没有用过相关或者类似的工具,因此特地写个文章进行相关的学习记录。

相关参考博文:https://www.cnblogs.com/cyfonly/p/6380860.htmlhttps://blog.csdn.net/qq_26641781/article/details/80408987https://blog.csdn.net/qinweili751/article/details/80620104

 

1.安装ActiveMQ

(1)进入官网http://activemq.apache.org/,选择最新的版本下载

ActiveMQ消息中间件

(2)再选择对应的系统环境(我这里选择的是windows版本)

ActiveMQ消息中间件

(3)下载完成后将其解压(我这里将它存放在D盘根目录下),目录结构如下

ActiveMQ消息中间件

(4)进入bin/win64/目录,启动activemq.bat文件(注意:MQ与jdk版本必须要匹配。我这里下载的MQ是5.15版本,对应的jdk最低要求是1.8)。

ActiveMQ消息中间件

ActiveMQ消息中间件

(5)启动完成后,输入浏览器地址http://localhost:8161/admin,会弹出用户名/密码输入框

ActiveMQ消息中间件

我们的账号密码是存放在ActiveMQ根目录的conf/jetty-realm.properties文件中。

ActiveMQ消息中间件

打开可以看到最下面已经有两个创建好了的用户了。如果我们需要添加自己的用户,或是修改它们的角色,都可以按照上面所写的格式"用户名:密码 [,角色]"来进行配置(角色被定义在~/conf/jetty.xml中)。

这里我们使用默认帐号,admin/admin

ActiveMQ消息中间件

(6)至此,我们的ActiveMQ已经安装完成。

 

2.使用ActiveMQ

  • ActiveMQ的使用一般分为以下几个步骤:
  • connectionFactory:创建连接工厂;
  • connection:从连接工厂中得到连接;
  • session:从连接中获得一个会话;
  • destination:从会话中获取一个destination。可以是Queue(P2P)或Topic(Pub/Sub)
  • Producer:根据session和destination创建服务生产者。
  •   Message:根据session创建消息。
  •   send:消息生产者将message发送给MQ
  • Consumer:根据session和destination创建服务消费者。
  •   receive:接收MQ中的消息。可以是同步接收,也可以是创建监听器异步接收。
  • 关闭资源。

由于我这里是Springboot的项目,因此有部分步骤已经在自动配置中处理好了。

(1)引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.4</version>
        </dependency>

(2)添加相关配置

#默认端口是61616,而不是我们访问网站的8161端口
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false

(3)添加配置类(PS:如果不配置该类,默认只会使用P2P,即设置Queue为destination。如果要使用Topic,则必须要配置下面的类)。

@Configuration
public class JmsConfig {

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);  //Queue是P2P,因此Pub/Sub设置为false。默认是false。
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);  //Topic是Pub/Sub,需要显示声明。
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

}

(4)编写服务消费者

这里为了能区别P2P和Pub/Sub,创建了两个服务消费者。

消费者1

@Service
public class ConsumerService {

    @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueue(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer接收到Queue消息:" + msg);
    }

    @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
    public void receiveTopic(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer接收到Topic消息:" + msg);
    }

}

消费者2

@Service
public class Consumer2Service {

    @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueue(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer2接收Queue消息:" + msg);
    }

    @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
    public void receiveTopic(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer2接收到Topic消息:" + msg);
    }
}

(5)编写服务生产者

@Service
public class ProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination, String msg) {
        System.out.println(LocalDateTime.now().toString() + " productor发送消息:" + msg);
        jmsTemplate.convertAndSend(destination, msg);
    }

}

(6)测试类

先测试P2P下的消息:

    @Test
    public void testMQQueue() {
        Destination destination = new ActiveMQQueue("springboot.queue.test");
        for (int i = 0; i < 3; i++) {
            producerService.sendMessage(destination, "hellow world " + i);
        }
    }

输出结果

2019-05-22T17:29:39.324 productor发送消息:hellow world 0
2019-05-22T17:29:39.373 consumer2接收Queue消息:hellow world 0
2019-05-22T17:29:39.379 productor发送消息:hellow world 1
2019-05-22T17:29:39.385 productor发送消息:hellow world 2
2019-05-22T17:29:39.388 consumer接收到Queue消息:hellow world 1
2019-05-22T17:29:39.391 consumer2接收Queue消息:hellow world 2

可以看到生产者每发出一个消息,都只会有一个消费者对消息进行处理。并且这里采用的是轮询的方式,即这次是消费者1接收了消息,下次就是消费者2接收,再下次又是消费者1。以此类推。

 

然后我们再测试下Pub/Sub的消息:

    @Test
    public void testMQTopic() {
        Destination destination = new ActiveMQTopic("springboot.topic.test");
        for (int i = 0; i < 2; i++) {
            producerService.sendMessage(destination, "hellow world " + i);
        }
    }

输出结果:

2019-05-22T17:35:58.535 productor发送消息:hellow world 0
2019-05-22T17:35:58.576 productor发送消息:hellow world 1
2019-05-22T17:35:58.581 consumer接收到Topic消息:hellow world 0
2019-05-22T17:35:58.582 consumer接收到Topic消息:hellow world 1
2019-05-22T17:35:58.582 consumer2接收到Topic消息:hellow world 0
2019-05-22T17:35:58.584 consumer2接收到Topic消息:hellow world 1

这里可以看到,每一个消息被发出来后,会被所有的服务消费者接收并处理。