ActiveMQ的使用笔记(基本实现原理)

时间:2023-03-08 23:30:42
ActiveMQ的使用笔记(基本实现原理)

具体原理不进行深入,会用就好。

第一:当然是先安装ActiveMQ,选择操作系统位数,安装成功以后,输入网址http://ip:8161/admin/,会出现相关页面,账号密码都是admin。在这个页面上可以看到消息队列的信息。consume和send使用较多。

ActiveMQ的使用笔记(基本实现原理)

第二:创建produce和consume。

  基本的原理都是一样的,JMS(java message send)的13个规范之一,消息中间件的一些组件拼接连通就行了。具体参考代码注释。

容易出错的地方是消息的格式。

这里有点对点模式和发布/订阅模式。我使用的时候是在初始化ContextListener的时候初始化ActiveMQ,然后并重载了onMessage事件(MessageConsumer里面定义了一些接口,可以去使用),这样就不需要一直主动去询问提供者了,队列中有消息时会自动触发该事件。

参考代码:

package com.enjoyor.soa.traffic.server.nmim.listener;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener; import com.enjoyor.soa.traffic.server.nmim.util.ActiveMQUtil;
import com.enjoyor.soa.traffic.server.nmim.util.Constants;
import com.enjoyor.soa.traffic.server.nmim.util.InitConfig;
import com.enjoyor.soa.traffic.server.nmim.service.inner.MqManageService;
import com.enjoyor.soa.traffic.util.frame.spring.SpringContextUtil;
import com.enjoyor.soa.traffic.util.helper.LoggerHelper;
import com.enjoyor.soa.traffic.util.pojo.ResultPojo; /**
*
* ContextListener
* 2016-11-25
*
*/
public class ContextListener implements ServletContextListener
{
public void contextDestroyed(ServletContextEvent arg0)
{
System.out.print("contextDestroyed");
} MqManageService manageService; public void contextInitialized(ServletContextEvent arg0)
{
System.out.print("contextInitialized");
String MQNAME = "";
InitConfig initConfig = new InitConfig();
initConfig.init();
MQNAME = InitConfig.confCache.get(Constants.MQNAME).toString();
ActiveMQUtil.getInstance().receive("queue://" + MQNAME, new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
if(null==manageService){
manageService = (MqManageService) SpringContextUtil.getBean("mqManageService");
manageService.setListDic();
}
//MqManageService manageService = (MqManageService) SpringContextUtil.getBean("mqManageService");
ResultPojo pojo = manageService.getMqMsg(msg);
if(!pojo.getappCode().equals("0")){
LoggerHelper.LOG.error("接收失败"+pojo.getResultList());
}
} catch (Exception e) {
LoggerHelper.LOG.error(e);
} }
});
} } ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
package com.enjoyor.soa.traffic.server.nmim.util; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator; public class ActiveMQUtil { private ActiveMQUtil(){ }
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
} private static ActiveMQUtil mq;
public static ActiveMQUtil getInstance(){
if(mq==null){
ApplicationContext context = new ClassPathXmlApplicationContext("activemq.xml");
mq=(ActiveMQUtil)context.getBean("activeMQUtil");
}
return mq;
} public Destination getDestination(String name){
Destination dest=null;
boolean pubSubDomain=name.toLowerCase().indexOf("topic://")==0;
String destName = name.replaceFirst("(?i)(topic|queue)://", "");
if(pubSubDomain){
dest= new ActiveMQTopic(destName);
}else{
dest= new ActiveMQQueue(destName);
}
return dest;
} public String receive(Destination dest) {
try {
TextMessage txtmsg = (TextMessage) getInstance().getJmsTemplate().receive(dest);
if(null!=txtmsg)return txtmsg.getText();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String receive(String name) {
return receive(getDestination(name));
}
public Connection receive(Destination dest,MessageListener listener){
try{
ConnectionFactory factory = getInstance().getJmsTemplate().getConnectionFactory();
Connection connection = factory.createConnection();
connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(listener);
return connection;
} catch (JMSException e) { }
return null;
}
public Connection receive(String name,MessageListener listener){
return receive(getDestination(name),listener);
} public void send(Destination dest,final String msg){
try {
jmsTemplate.send(dest,new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(msg);
return message;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(String name,final String msg){
send(getDestination(name),msg);
} }

注意将ContextListener初始化时注入spring容器中。任何类都需要一个入口,只有进入spring容器中,在jvm中被调用,才有存在的价值。