ActiveMQ Message Groups

时间:2021-12-17 02:15:45

http://activemq.apache.org/message-groups.html

与Exclusive Consumer相比,Message Groups的对消息分组的粒度更细。具有相同groupId的消息会被投送到同一个消费者,除非这个消费者挂了。

代码示例:

Mesasge message = session.createTextMessage("<foo>hey</foo>");
// 设置groupId
message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
// 设置sequence
message.setIntProperty("JMSXGroupSeq", -1); producer.send(message);

对应的代码在 org.apache.activemq.broker.region.Queue 中:

// 判断消息能否分发给消费者,返回true表示可以
// Subscription 表示消费者,QueueMessageReference 表示消息
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node)
throws Exception {
// 默认为true
boolean result = true;
// Keep message groups together.
// 获取消息的"JMSXGroupID"属性
String groupId = node.getGroupID();
// 获取消息的"JMSXGroupSeq"属性
int sequence = node.getGroupSequence();
if (groupId != null) {
// MessageGroupMap是一个Map,键是groupId,值是消费者
MessageGroupMap messageGroupOwners = getMessageGroupOwners();
// If we can own the first, then no-one else should own the
// rest.
if (sequence == 1) {
assignGroup(subscription, messageGroupOwners, node, groupId);
} else { // Make sure that the previous owner is still valid, we may
// need to become the new owner.
ConsumerId groupOwner;
// 根据groupId取出消费者
groupOwner = messageGroupOwners.get(groupId);
if (groupOwner == null) {
assignGroup(subscription, messageGroupOwners, node, groupId);
} else {
if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
// A group sequence < 1 is an end of group signal.
if (sequence < 0) {
messageGroupOwners.removeGroup(groupId);
subscription.getConsumerInfo().
setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
}
} else {
result = false;
}
}
}
} return result;
} // 往MessageGroupMap中插入键值对
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners,
MessageReference n, String groupId) throws IOException {
messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
Message message = n.getMessage();
message.setJMSXGroupFirstForConsumer(true);
subs.getConsumerInfo().
setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
}