Activemq消息的发送与接收

时间:2022-12-21 10:10:38

这两天做项目用到即时消息的发送与接受,所以突击自学activemq。如果能看到这,说明你已经搜了好多资料了,这里我就不多说了。经过我的学习与网上的各种资料整理,封装了一个简单轻便的工具类方便大家使用。

第一步:需要导入jar包

	<!-- activemq 开始 -->
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-core</artifactId>
		<version>5.7.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-pool</artifactId>
		<version>5.12.1</version>

	</dependency>
	<!-- activemq 结束 -->

第二步:工具类

package com.mwk.common.utils;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * activemq消息交互
 * @author 凯凯
 *
 */
public class ActiveMqUtil {
	//地址
	private static final String URL="tcp://localhost:61616";
	//设置接收者接收消息的时间
	private static final int RECEIVE_TIME=100000;
	
	/**
	 * 发送者
	 * @param mesage
	 */
	public static void Sender(String mesage,String MWK_Queue) {
		// ConnectionFactory :连接工厂,JMS 用它创建连接
		ConnectionFactory connectionFactory;
		// Connection :JMS 客户端到JMS Provider 的连接
		Connection connection = null;
		// Session: 一个发送或接收消息的线程
		Session session;
		// Destination :消息的目的地;消息发送给谁.
		Destination destination;
		// MessageProducer:消息发送者
		MessageProducer producer;
		// TextMessage message;
		// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, URL);
		try {
			// 构造从工厂得到连接对象
			connection = connectionFactory.createConnection();
			// 启动
			connection.start();
			// 获取操作连接
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
			destination = session.createQueue(MWK_Queue);
			// 得到消息生成者【发送者】
			producer = session.createProducer(destination);
			// 设置不持久化,此处学习,实际根据项目决定
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			// 构造消息,此处写死,项目就是参数,或者方法获取
			sendMessage(session, producer, mesage);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	private static void sendMessage(Session session, MessageProducer producer, String mesage) throws Exception {
		
			TextMessage message = session.createTextMessage(mesage);
			// 发送消息到目的地方
			System.out.println("发送:" + mesage);
			producer.send(message);
	}
	// -------------------------------------------------------------------------

	/**
	 * 接收者
	 * @return
	 */
	public static TextMessage Receiver(String MWK_Queue) {
		// ConnectionFactory :连接工厂,JMS 用它创建连接
		ConnectionFactory connectionFactory;
		// Connection :JMS 客户端到JMS Provider 的连接
		Connection connection = null;
		// Session: 一个发送或接收消息的线程
		Session session;
		// Destination :消息的目的地;消息发送给谁.
		Destination destination;
		// 消费者,消息接收者
		MessageConsumer consumer;
		connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, URL);
		try {
			// 构造从工厂得到连接对象
			connection = connectionFactory.createConnection();
			// 启动
			connection.start();
			// 获取操作连接
			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
			destination = session.createQueue(MWK_Queue);
			consumer = session.createConsumer(destination);
			while (true) {
				// 设置接收者接收消息的时间
				TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME);
				if (null != message) {
					System.out.println("收到" + message.getText());
					return message;
				} else {
					break;
				}
			}
		} catch (Exception e) {
			
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
		return null;
	}

}


有改进的地方请各位留言,相互学习,谢谢