ActiveMQ异步分发消息

时间:2023-03-09 15:34:45
ActiveMQ异步分发消息

org.apache.activemq.ActiveMQConnection 类中有个参数:

protected boolean dispatchAsync=true;

这个参数的含义到底是什么?

使用这个参数的调用栈如下:

ActiveMQ异步分发消息

org.apache.activemq.broker.region.PrefetchSubscription.dispatch

protected boolean dispatch(final MessageReference node) throws IOException {
final Message message = node.getMessage();
if (message == null) {
return false;
} okForAckAsDispatchDone.countDown(); // No reentrant lock - Patch needed to IndirectMessageReference on method lock
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
dispatched.add(node);
} else {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new TransmitCallback() { @Override
public void onSuccess() {
// Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire.
onDispatch(node, message);
} @Override
public void onFailure() {
Destination nodeDest = (Destination) node.getRegionDestination();
if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment();
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
}
}
}
});
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
return true;
}

异步和同步分别对应 TransportConnection 类的2个方法:dispatchAsync,dispatchSync

先分析同步代码:

public void dispatchSync(Command message) {
try {
processDispatch(message);
} catch (IOException e) {
serviceExceptionAsync(e);
}
}

很干脆,直接调用 processDispatch 方法。

再分析异步发送:

public void dispatchAsync(Command message) {
if (!stopping.get()) {
if (taskRunner == null) {
dispatchSync(message);
} else {
synchronized (dispatchQueue) {
dispatchQueue.add(message);
}
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} else {
if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) message;
TransmitCallback sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.onFailure();
}
}
}
}

先把消息加入到dispatchQueue中,然后唤醒taskRunner。

taskRunner线程的调用栈如下:

ActiveMQ异步分发消息

public boolean iterate() {
try {
if (pendingStop || stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
dispatch(new ShutdownInfo());
} catch (Throwable ignore) {
}
}
dispatchStoppedLatch.countDown();
}
return false;
}
if (!dispatchStopped.get()) {
Command command = null;
synchronized (dispatchQueue) {
if (dispatchQueue.isEmpty()) {
return false;
}
command = dispatchQueue.remove(0);
}
processDispatch(command);
return true;
}
return false;
} catch (IOException e) {
if (dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown();
}
serviceExceptionAsync(e);
return false;
}
}

最终调用的也是 processDispatch 方法。