ActiveMQ 中的链表

时间:2023-03-09 07:30:26
ActiveMQ 中的链表

ActiveMQ 中的消息在内存中时,以链表形式保存,以 PendingList 表示,每一个消息是 PendingNode。

PendingList 主要有2种实现:OrderedPendingList 和 PrioritizedPendingList

OrderedPendingList 就是一个双向链表,多了一个保存消息的 map:

public class OrderedPendingList implements PendingList {
//省略其他代码
private PendingNode root = null;
private PendingNode tail = null;
private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
}

而 PrioritizedPendingList,是维持了一个 OrderedPendingList 数组,消息根据优先级存放在对应的 OrderedPendingList 中。

public class PrioritizedPendingList implements PendingList {
//省略其他代码
private static final Integer MAX_PRIORITY = 10;
private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); public PrioritizedPendingList() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i] = new OrderedPendingList();
}
}
}

message cursor 的作用是遍历消息。

常用的 message cursor 分为 VMPendingMessageCursor 和 FilePendingMessageCursor,VMPendingMessageCursor 主要是对 VMPendingMessageCursor 做一个封装:

public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
//省略其他代码
private final PendingList list;
private Iterator<MessageReference> iter; public VMPendingMessageCursor(boolean prioritizedMessages) {
super(prioritizedMessages);
if (this.prioritizedMessages) {
this.list= new PrioritizedPendingList();
}else {
this.list = new OrderedPendingList();
}
}
}

而 FilePendingMessageCursor 就复杂的多,它会把消息刷到临时文件中:

// 省略其他代码
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
protected Broker broker;
// PListStoreImpl 临时文件
private final PListStore store;
// 内存中的消息
private PendingList memoryList;
// PListImpl
private PList diskList;
}

持久化消息使用的 message cursor 是 QueueStorePrefetch:

// 省略其他代码
class QueueStorePrefetch extends AbstractStoreCursor {
// 类型为:KahaDBTransactionStore$1
private final MessageStore store;
private final Broker broker;
}