ActiveMQ消息发送与接收

时间:2021-07-12 10:08:19

推荐文章:ActiveMQ讯息传送机制以及ACK机制

 

ActiveMQ发送消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:发送消息

消息发送类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

private static final String USERNAME = "admin";

private static final String PASSWORD = "admin";

private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

private AtomicInteger count = new AtomicInteger();

private ConnectionFactory connectionFactory;

private Connection connection;

private Session session;

private Queue queue;

private MessageProducer producer;

public void init() {
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//启动链接,不启动不影响消息的发送,但影响消息的接收
connection.start();
//创建一个事物session
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//获取消息发送的目的地,指消息发往那个地方
queue = session.createQueue("test");
//获取消息发送的生产者
producer = session.createProducer(queue);
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void sendMsg(String queueName) {
try {
int num = count.getAndIncrement();
TextMessage msg
= session.createTextMessage(Thread.currentThread().getName()+
"productor:生产者发送消息!,count:"+num);
producer.send(msg);

session.commit();
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
connection.createSession方法
 /**
* Creates a <CODE>Session</CODE> object.
*
*
@param transacted indicates whether the session is transacted
*
@param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
*
@return a newly created session
*
@throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
*
@see Session#AUTO_ACKNOWLEDGE
*
@see Session#CLIENT_ACKNOWLEDGE
*
@see Session#DUPS_OK_ACKNOWLEDGE
*
@since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if(!transacted) {
if (acknowledgeMode==Session.SESSION_TRANSACTED) {
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
}
else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
"Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
}
}
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
}
createSession方法里有两个参数,第一个参数表示是否使用事务,第二个参数表示消息的确认模式。消息的确认模式共有4种:
1:AUTO_ACKNOWLEDGE 自动确认
2:CLIENT_ACKNOWLEDGE 客户端手动确认 
3:DUPS_OK_ACKNOWLEDGE 自动批量确认
0:SESSION_TRANSACTED 事务提交并确认
4:INDIVIDUAL_ACKNOWLEDGE 单条消息确认 为AcitveMQ自定义的ACK_MODE
各种确认模式详细说明可以看文章:ActiveMQ讯息传送机制以及ACK机制
从createSession方法中可以看出如果如果session不使用事务但是却使用了消息提交(SESSION_TRANSACTED)确认模式,或使用的消息确认模式不存在,将抛出异常。

ActiveMQ接收消息

  1:创建链接工厂ConnectionFactory

  2:创建链接Connection

  3:启动session

  4:创建消息发送目的地

  5:创建生产者

  6:接收消息或设置消息监听器

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";

private static final String PASSWORD = "admin";

private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

private AtomicInteger count = new AtomicInteger();

private ConnectionFactory connectionFactory;

private ActiveMQConnection connection;

private Session session;

private Queue queue;

private MessageConsumer consumer;

public void init() {
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
//从工厂中创建一个链接
connection = (ActiveMQConnection) connectionFactory.createConnection();

//启动链接
connection.start();
//创建一个事物session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//获取消息接收的目的地,指从哪里接收消息
queue = session.createQueue("test");
//获取消息接收的消费者
consumer = session.createConsumer(queue);
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void receiver(String queueName) {

try {
TextMessage msg
= (TextMessage) consumer.receive();
if(msg!=null) {
System.out.println(Thread.currentThread().getName()
+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
consumer.receive方法
/**
* Receives the next message produced for this message consumer.
* <P>
* This call blocks indefinitely until a message is produced or until this
* message consumer is closed.
* <P>
* If this <CODE>receive</CODE> is done within a transaction, the consumer
* retains the message until the transaction commits.
*
*
@return the next message produced for this message consumer, or null if
* this message consumer is concurrently closed
*/
public Message receive() throws JMSException {
checkClosed(); //检查unconsumedMessages是否关闭 ,消费者从unconsumedMessages对象中获取消息
checkMessageListener(); //检查是否有其他消费者使用了监听器,同一消息消息队列中不能采用reveice和messageListener并存消费消息

sendPullCommand(
0); //如果prefetchSize为空且unconsumedMessages为空 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备
MessageDispatch md
= dequeue(-1); //从unconsumedMessages取出一个消息 
if (md == null) {
return null;
}

beforeMessageIsConsumed(md);
afterMessageIsConsumed(md,
false);

return createActiveMQMessage(md);
}
prefetchSize属性如果大于0,消费者每次拉去消息时都会预先拉取一定量的消息,拉取的消息数量<=prefetchSize,prefetchSize默认指为1000,这个默认值是从connection中传过来的
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
checkClosed();

if (destination instanceof CustomDestination) {
CustomDestination customDestination
= (CustomDestination)destination;
return customDestination.createConsumer(this, messageSelector, noLocal);
}

ActiveMQPrefetchPolicy prefetchPolicy
= connection.getPrefetchPolicy();
int prefetch = 0;
if (destination instanceof Topic) {
prefetch
= prefetchPolicy.getTopicPrefetch();
}
else {
prefetch
= prefetchPolicy.getQueuePrefetch();
}
ActiveMQDestination activemqDestination
= ActiveMQMessageTransformation.transformDestination(destination);
return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal,
false, isAsyncDispatch(), messageListener);
}

消息接收类代码中调用session.createConsumer其实调用的就是上面的createConsumer方法,从上面代码中可以看出connection会将自己的prefetch传递给消费者,connection中的ActiveMQPrefetchPolicy

对象属性如下:

public class ActiveMQPrefetchPolicy extends Object implements Serializable {
public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE;
public static final int DEFAULT_QUEUE_PREFETCH = 1000;
public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;

private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class);

private int queuePrefetch;
private int queueBrowserPrefetch;
private int topicPrefetch;
private int durableTopicPrefetch;
private int optimizeDurableTopicPrefetch;
private int inputStreamPrefetch;
private int maximumPendingMessageLimit;

/**
* Initialize default prefetch policies
*/
public ActiveMQPrefetchPolicy() {
this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
}

这里我只截了一部分代码,可以看到队列(queue)默认的queuePrefetch为1000,queuePrefetch的最大值不能超过MAX_PREFETCH_SIZE(32767)

当然我们也可以自己设置消费者预先拉取的消息数量,方法有两种

一:在创建connection之后修改connection中的queuePrefetch;代码如下:

ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(number);
connection.setPrefetchPolicy(prefetchPolicy);

二:在创建队列(queue)的时候传入参数,回到ActiveMQMessageConsumer的创建代码中:

 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector,
int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
else if (dest.getPhysicalName() == null) {
throw new InvalidDestinationException("The destination object was not given a physical name.");
}
else if (dest.isTemporary()) {
String physicalName
= dest.getPhysicalName();

if (physicalName == null) {
throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
}

String connectionID
= session.connection.getConnectionInfo().getConnectionId().getValue();

if (physicalName.indexOf(connectionID) < 0) {
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}

if (session.connection.isDeleted(dest)) {
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
if (prefetch < 0) {
throw new JMSException("Cannot have a prefetch size less than zero");
}
}
if (session.connection.isMessagePrioritySupported()) {
this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
}
else {
this.unconsumedMessages = new FifoMessageDispatchChannel();
}

this.session = session;
this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
setTransformer(session.getTransformer());

this.info = new ConsumerInfo(consumerId);
this.info.setExclusive(this.session.connection.isExclusiveConsumer());
this.info.setSubscriptionName(name);
this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch);
this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
this.info.setNoLocal(noLocal);
this.info.setDispatchAsync(dispatchAsync);
this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
this.info.setSelector(null);

// Allows the options on the destination to configure the consumerInfo
if (dest.getOptions() != null) {
Map
<String, Object> options = IntrospectionSupport.extractProperties(
new HashMap<String, Object>(dest.getOptions()), "consumer.");
IntrospectionSupport.setProperties(
this.info, options);
if (options.size() > 0) {
String msg
= "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}

this.info.setDestination(dest);
this.info.setBrowser(browser);
if (selector != null && selector.trim().length() != 0) {
// Validate the selector
SelectorParser.parse(selector);
this.info.setSelector(selector);
this.selector = selector;
}
else if (info.getSelector() != null) {
// Validate the selector
SelectorParser.parse(this.info.getSelector());
this.selector = this.info.getSelector();
}
else {
this.selector = null;
}

this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
}

this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
if (messageListener != null) {
setMessageListener(messageListener);
}
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
}
catch (JMSException e) {
this.session.removeConsumer(this);
throw e;
}

if (session.connection.isStarted()) {
start();
}
}

this.info.setPrefetchSize(prefetch);

又上面代码可以看出,在创建ActiveMQMessageConsumer的过程中,程序会将connection中的queuePrefetch赋给ActiveMQMessageConsumer对象中的info对象(info为一个ConsumerInfo对象)

Map<String, Object> options = IntrospectionSupport.extractProperties(
new HashMap<String, Object>(dest.getOptions()), "consumer.");
IntrospectionSupport.setProperties(
this.info, options);

在创建队列(queue)的过程中,我们可以传一些参数来配置消费者,这些参数的前缀必须为consumer. ,当我们传的参数与info对象中的属性匹配时,将覆盖info对象中的属性值,其传参形式如下:

queueName?param1=value1&param2=value2

所以我们如果想改变消费者预先拉取的消息数量,可以在创建对象的时候传入如下参数

queue = session.createQueue("test?consumer.prefetchSize=number");

 

ActiveMq接收消息--监听器

监听器代码如下:

package com.apt.study.util.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ReceiveListener implements MessageListener {

@Override
public void onMessage(Message message) {
try {
TextMessage msg
= (TextMessage) message;
if(msg!=null) {
System.out.println(Thread.currentThread().getName()
+": Consumer:我是消费者,我正在消费Msg"+msg.getText());
}
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

消息接收类:

package com.apt.study.util.activemq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

public class Receiver {

private static final String USERNAME = "admin";

private static final String PASSWORD = "admin";

private static final String BROKEN_URL = "tcp://127.0.0.1:61616";

private AtomicInteger count = new AtomicInteger();

private ConnectionFactory connectionFactory;

private ActiveMQConnection connection;

private Session session;

private Queue queue;

private MessageConsumer consumer;

public void init() {
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
//从工厂中创建一个链接
connection = (ActiveMQConnection) connectionFactory.createConnection();

//启动链接
connection.start();
//创建一个事物session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

queue
= session.createQueue("test");

consumer
= session.createConsumer(queue);
//设置消息监听器
consumer.setMessageListener(new ReceiveListener());
}
catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}
consumer.setMessageListener方法:
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}

this.messageListener.set(listener);
session.redispatch(
this, unconsumedMessages);

if (wasRunning) {
session.start();
}
}
else {
this.messageListener.set(null);
}
}

从代码中可以看出,当我们使用监听器时,消费者prefetchSize必须大于0