ActiveMQ producer同步/异步发送消息

时间:2022-06-20 03:24:53

http://activemq.apache.org/async-sends.html

producer发送消息有同步和异步两种模式,可以通过代码配置:

((ActiveMQConnection)connection).setUseAsyncSend(true);

producer默认是异步发送消息。在没有开启事务的情况下,producer发送持久化消息是同步的,调用send会阻塞直到broker把消息保存到磁盘并返回确认。

消息设置为持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

消息设置为非持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

producer发送消息的调用栈如下:

ActiveMQ producer同步/异步发送消息

// ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination,
Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
// 省略其他代码
// 消息的持久类型和和连接模式是或的:所以只要connection配置为异步,就走异步发送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend()
&& (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else { // 同步发送
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}

producer发送同步消息的调用栈:

ActiveMQ producer同步/异步发送消息

// org.apache.activemq.transport.ResponseCorrelator
public Object request(Object command) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult();
} public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
// 需要回复
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
IOException priorError = null;
synchronized (requestMap) {
priorError = this.error;
if (priorError == null) {
requestMap.put(new Integer(command.getCommandId()), future);
}
} if (priorError != null) {
future.set(new ExceptionResponse(priorError));
throw priorError;
} next.oneway(command);
return future;
}

producer发送异步消息的调用栈:

ActiveMQ producer同步/异步发送消息

//org.apache.activemq.transport.ResponseCorrelator
public void oneway(Object o) throws IOException {
Command command = (Command)o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
// 不需要回复
command.setResponseRequired(false);
next.oneway(command);
}

在不考虑事务的情况下:

producer发送持久化消息是同步发送,发送是阻塞的,直到收到确认。同步发送肯定是有流量控制的。

producer默认是异步发送,异步发送不会等待broker的确认, 所以就需要考虑流量控制了:

ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)

ProducerWindowSize的含义:producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。