ActiveMQ的环境搭建及使用

时间:2023-03-10 07:41:55
ActiveMQ的环境搭建及使用

一:环境搭建

ActiveMQ官网下载mq在windows上的安装包:http://activemq.apache.org/,解压到某个磁盘下。

ActiveMQ的环境搭建及使用

运行要环境条件:jdk安装1.8,(本人这里安装版本)。

到达解压目录,进入\bin\win64(测试机器是x64,如果你的机器是x86,请进入\bin\win32),双击运行“conf\activemq.bat”。

此时我这里抛出了一个异常:

jvm 1    |  WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: 
  Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed;
  nested exception is java.net.URISyntaxException: Illegal character in hostname atindex 7: ws://PC_DX_20151117:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException:
  BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext

从上边错误信息可以得知这里是发布ws服务是失败,解决方案:修改activemq.xml文件:

        <transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<!--注释掉抛出错误的默认的配置:错误原因,计算机系统原因不支持发布ws服务。-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<!--<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>

备注:这里是把发布ws服务给注释掉了。

之后,重新启动,启动成功。

然后:http://127.0.0.1:8161/admin/ 验证服务启动情况,用户名/密码 admin/admin。界面如下:

ActiveMQ的环境搭建及使用

ActiveMQ 点对点消息实现 - 直接receive方式

新建一个java project工程,引入activemq安装包根目录的activemq-all-5.15.3.jar包。

ActiveMQ的环境搭建及使用

消息生产者:

package com.activemq.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
private static final int SENDNUM = 10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createQueue("FirstQueue1"); // 创建消息队列
messageProducer = session.createProducer(destination); // 创建消息生产者
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
} /**
* 发送消息
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i);
messageProducer.send(message);
}
}
}

消息消费者:

package com.activemq.consumer.receive;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息消费者
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createQueue("FirstQueue1"); // 创建连接的消息队列
messageConsumer = session.createConsumer(destination); // 创建消息消费者
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

ActiveMQ 点对点消息实现 - 使用Listener监听方式

生产者代码同上.

监听器类:

package com.activemq.consumer.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /**
* 消息监听
*/
public class Listener implements MessageListener {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }

消息消费者中注册监听器:

package com.activemq.consumer.listener;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息消费者
*
* @author Administrator
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createQueue("FirstQueue1"); // 创建连接的消息队列
messageConsumer = session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new Listener()); // 注册消息监听
} catch (JMSException e) {
e.printStackTrace();
}
}
}

ActiveMQ 点对点消息实现 - ActiveMQ 发布-订阅消息模式实现

一个生产者发布对应多个消费者订阅。
消息的生产者:
package com.activemqmulti.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息生产者-消息发布者*/
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL ="tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
private static final int SENDNUM = 10; // 发送的消息数量 public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1");
destination = session.createTopic("FirstTopic1"); // 创建消息队列
messageProducer = session.createProducer(destination); // 创建消息生产者
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} /**
* 发送消息*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
System.out.println("发送消息:" + "ActiveMQ 发布的消息" + i);
messageProducer.send(message);
}
}
}

消息的监听器一:

package com.activemqmulti.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /**
* 消息监听-订阅者一
*/
public class Listener implements MessageListener { @Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }

消息的监听器二:

package com.activemqmulti.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /**
* 消息监听-订阅者二
*/
public class Listener2 implements MessageListener { @Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }

消息的订阅者一:

package com.activemqmulti.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息消费者-消息订阅者一
*/
public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL ="tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
destination = session.createTopic("FirstTopic1");
messageConsumer = session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new Listener()); // 注册消息监听
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

消息的订阅者二:

package com.activemqmulti.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; /**
* 消息消费者-消息订阅者二
*/
public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL); try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
destination = session.createTopic("FirstTopic1");
messageConsumer = session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}