ActiveMQ之HelloWorld

时间:2023-03-10 03:13:44
ActiveMQ之HelloWorld

JMS实现JMS接口的消息中间件

Provider:生产者

Consumer:消费者

PTP:Point to Point:点对点的消息模型

Pub/Sub:Publish/Subscribe:发布订阅的消息模型

Queue:队列目标;

Topic:主题目标

ConnectionFactory:连接工厂,JMS用它创建连接、

Connection:JMS客户端到JMS Provider的连接

Destination:消息的目的地

Session:会话,一个发送或者接收消息的线程

ActiveMQ简介

ActiveMQ是Apache出品,最流行,能力强劲的开源消息总线。

ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演者特殊的地位,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断的升级版本,80%一手的业务,使用ActMQ已经足够满肚需求,大型电商网站,可以去应用RocketMQ,分布式消息中间件来实现。

下面看一个helloword:

首先,启动activeMq

ActiveMQ之HelloWorld

启动后,在浏览器访问ActiveMQ之HelloWorld这个地址,就会出现activeMq的界面点击第一个选项,出现登录界面,账号密码都是admin,

ActiveMQ之HelloWorld

这就是登录后的界面,下面开始看一下代码:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException {
// 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"); // 第二步,通过ConnectionFactory工厂对象我们创建以一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的
Connection connection = connectionFactory.createConnection();
connection.start(); // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE); // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里
// 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic
Destination desiDestination = session.createQueue("queue1"); // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)
// MessageProducer/MessageConsumer
MessageProducer messageProducer = session
.createProducer(desiDestination); // 第六步:使用MessageProducer的setDeliveryMode方法为其设置持久性和非持久性(setDeliveryMode);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 第七步:使用JMS规范的textMesage形式创建数据(通过session对象),并未MessageProducer的send方法发送数据,最后关闭Connection链接。
for(int i = 0; i < 5; i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("测试消息!!!id为" + i);
messageProducer.send(textMessage);
} if(connection != null) {
connection.close();
} } }

这里,将消息发送给activeMq,我们运行一下,然后查看一下activeMq里面是否有发送的数据。

点击浏览器中activeMq界面的queue选项。

ActiveMQ之HelloWorld

这里面有一个刚刚新创建的queue,点进去,是刚刚存入的5条记录。

ActiveMQ之HelloWorld

刚刚那是生产者,下面看一下消费者端的代码:

消费者消费后,activeMQ中将不存在数据。

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive {
public static void main(String[] args) throws JMSException {
// 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"); // 第二步,通过ConnectionFactory工厂对象我们创建以恶搞Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的
Connection connection = connectionFactory.createConnection();
connection.start(); // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为啥都启用事务,参数配置2为签收模式,一般我们设置自动签收。
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE); // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里
// 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic
Destination desiDestination = session.createQueue("queue1"); // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)
// MessageProducer/MessageConsumer
MessageConsumer messageConsumer = session
.createConsumer(desiDestination); while(true){
TextMessage msg = (TextMessage)messageConsumer.receive();
if(msg == null){
break;
}
System.out.println("收到的內容:"+msg.getText());
} if(connection != null) {
connection.close();
}
}
}

下面运行一下这个消费者:

ActiveMQ之HelloWorld

将刚刚存入的数据,全部获取出来,下面看一下浏览器的activeMq。

ActiveMQ之HelloWorld

待处理的数量是0,点进去也是空的,说明刚刚生产的数据已经被全部消费掉了。