消费ActiveMQ消息时如何保留消息顺序?

时间:2021-10-13 00:15:48

I have a .NET service that uses the ActiveMQ client. I have implemented a MessageListener with a transacted connection to consume the messages.

我有一个使用ActiveMQ客户端的.NET服务。我已经实现了一个带有事务连接的MessageListener来使用消息。

Occasionally, I get messages in a different order in which they were put onto the queue.

偶尔,我会以不同的顺序获取消息,并将消息放入队列。

Was it wrong to use a MessageListner? Is there a way to preserve the message order?

使用MessageListner是不对的?有没有办法保留消息顺序?

FYI: There is one producer putting messages on the queue and one consumer pulling messages off the queue.

仅供参考:有一个生产者将消息放入队列,一个消费者将消息从队列中拉出。

4 个解决方案

#1


You shouldn't have to do anything to maintain the order; that's one of the things that a message queue does for you. I would think that if you have a single consumer listening to the queue, and it is processing messages out of order, then you have either found a bug or the messages aren't enqueued in the order you think they are.

你不应该做任何事来维持秩序;这是消息队列为您做的事情之一。我认为如果你有一个消费者正在监听队列,并且它正在处理无序的消息,那么你要么发现了一个错误,要么消息没有按照你认为的顺序排队。

Also, there's this question from the ActiveMQ FAQ that might help.

此外,ActiveMQ FAQ中的这个问题可能有所帮助。

Edit: From reading the comments on duffymo's answer, it looks like you're overengineering a bit. In general, a message queue like ActiveMQ, MQ Series, joram, etc. have two characteristics: they deliver messages in the same order in which they are enqueued, and they guarantee message delivery. Sending a separate ACK message is redundant; it's a bit like committing a database transaction, then querying the same information back to double-check that the database actually stored it.

编辑:从阅读关于duffymo的答案的评论,看起来你有点过度工程。通常,像ActiveMQ,MQ Series,joram等消息队列具有两个特征:它们以与它们排队相同的顺序传递消息,并且它们保证消息传递。发送单独的ACK消息是多余的;这有点像提交数据库事务,然后再查询相同的信息以仔细检查数据库实际存储它。

Having said that, is your server multithreaded? If so it may possible for it to enqueue the response before it enqueues the ACK.

话虽如此,您的服务器是多线程的吗?如果是这样,它可能在响应排队之前将响应排入队列。

#2


More Info: The server I call is third party and I have no control over what messages it sends. I do know, based on the message ID's that they are put on in the right order. Also, in this system there is a difference between ACK and Acknowledge. The ACK is received when my original message is put on the the "outbound" queue. I then monitor an "inbound" queue for responses. I usually get an "Acknowledge" followed by a "Pass" or a "Fail". The correlation ID for these messages is the message ID from the ACK.

更多信息:我调用的服务器是第三方,我无法控制它发送的消息。我知道,根据消息ID,它们以正确的顺序放置。此外,在该系统中,ACK和Acknowledge之间存在差异。当我的原始消息放在“出站”队列上时,收到ACK。然后我监视“入站”队列以获得响应。我通常得到一个“确认”,然后是“通行证”或“失败”。这些消息的相关ID是来自ACK的消息ID。

I had originally used a message listener and responded to an OnMessge event. I abandoned this approach after realizing that, using this method, messages are delivered asynchronously and therefore in no particular order. So, I changed my code to poll using a timer (System.Threading.Timer) to call consumer.Receive() and get one message at a time. This works the way I want.

我最初使用了一个消息监听器并响应了一个OnMessge事件。我意识到,使用这种方法,消息是异步传递的,因此没有特定的顺序,我放弃了这种方法。因此,我将我的代码更改为使用计时器(System.Threading.Timer)进行轮询以调用consumer.Receive()并一次获取一条消息。这按照我想要的方式工作。

I open the consumer once when the service starts and I continuously poll for messages.

我在服务启动时打开消费者一次,并不断轮询消息。

#3


Why is message order important? A MessageListener shouldn't have to care.

为什么消息顺序很重要? MessageListener不应该关心。

If you need a correlation ID to match a response with a particular request, that's another matter. Is that what you mean?

如果您需要相关ID来匹配具有特定请求的响应,那么这是另一回事。你是这个意思吗?

#4


Everything Jason just said. A few other things to be careful of. You are keeping the consumer open for lots of messages right? You're not creating a consumer for a few messages then closing it? Only closing a consumer causes messages associated with a consumer to be put back onto a queue which can break order.

杰森刚才说的一切。还有一些需要注意的事情。您是否让消费者对大量消息保持开放?您是不是在为一些消息创建消费者然后关闭它?仅关闭消费者会导致与消费者相关联的消息被放回到可能破坏订单的队列中。

Is it related to rollbacks? (Are you rolling back any transactions?).

它与回滚有关吗? (你回滚任何交易吗?)。

Finally, you can always ensure order by using a Resequencer to reorder things for you.

最后,您可以通过使用Resequencer为您重新排序来确保订单。

#1


You shouldn't have to do anything to maintain the order; that's one of the things that a message queue does for you. I would think that if you have a single consumer listening to the queue, and it is processing messages out of order, then you have either found a bug or the messages aren't enqueued in the order you think they are.

你不应该做任何事来维持秩序;这是消息队列为您做的事情之一。我认为如果你有一个消费者正在监听队列,并且它正在处理无序的消息,那么你要么发现了一个错误,要么消息没有按照你认为的顺序排队。

Also, there's this question from the ActiveMQ FAQ that might help.

此外,ActiveMQ FAQ中的这个问题可能有所帮助。

Edit: From reading the comments on duffymo's answer, it looks like you're overengineering a bit. In general, a message queue like ActiveMQ, MQ Series, joram, etc. have two characteristics: they deliver messages in the same order in which they are enqueued, and they guarantee message delivery. Sending a separate ACK message is redundant; it's a bit like committing a database transaction, then querying the same information back to double-check that the database actually stored it.

编辑:从阅读关于duffymo的答案的评论,看起来你有点过度工程。通常,像ActiveMQ,MQ Series,joram等消息队列具有两个特征:它们以与它们排队相同的顺序传递消息,并且它们保证消息传递。发送单独的ACK消息是多余的;这有点像提交数据库事务,然后再查询相同的信息以仔细检查数据库实际存储它。

Having said that, is your server multithreaded? If so it may possible for it to enqueue the response before it enqueues the ACK.

话虽如此,您的服务器是多线程的吗?如果是这样,它可能在响应排队之前将响应排入队列。

#2


More Info: The server I call is third party and I have no control over what messages it sends. I do know, based on the message ID's that they are put on in the right order. Also, in this system there is a difference between ACK and Acknowledge. The ACK is received when my original message is put on the the "outbound" queue. I then monitor an "inbound" queue for responses. I usually get an "Acknowledge" followed by a "Pass" or a "Fail". The correlation ID for these messages is the message ID from the ACK.

更多信息:我调用的服务器是第三方,我无法控制它发送的消息。我知道,根据消息ID,它们以正确的顺序放置。此外,在该系统中,ACK和Acknowledge之间存在差异。当我的原始消息放在“出站”队列上时,收到ACK。然后我监视“入站”队列以获得响应。我通常得到一个“确认”,然后是“通行证”或“失败”。这些消息的相关ID是来自ACK的消息ID。

I had originally used a message listener and responded to an OnMessge event. I abandoned this approach after realizing that, using this method, messages are delivered asynchronously and therefore in no particular order. So, I changed my code to poll using a timer (System.Threading.Timer) to call consumer.Receive() and get one message at a time. This works the way I want.

我最初使用了一个消息监听器并响应了一个OnMessge事件。我意识到,使用这种方法,消息是异步传递的,因此没有特定的顺序,我放弃了这种方法。因此,我将我的代码更改为使用计时器(System.Threading.Timer)进行轮询以调用consumer.Receive()并一次获取一条消息。这按照我想要的方式工作。

I open the consumer once when the service starts and I continuously poll for messages.

我在服务启动时打开消费者一次,并不断轮询消息。

#3


Why is message order important? A MessageListener shouldn't have to care.

为什么消息顺序很重要? MessageListener不应该关心。

If you need a correlation ID to match a response with a particular request, that's another matter. Is that what you mean?

如果您需要相关ID来匹配具有特定请求的响应,那么这是另一回事。你是这个意思吗?

#4


Everything Jason just said. A few other things to be careful of. You are keeping the consumer open for lots of messages right? You're not creating a consumer for a few messages then closing it? Only closing a consumer causes messages associated with a consumer to be put back onto a queue which can break order.

杰森刚才说的一切。还有一些需要注意的事情。您是否让消费者对大量消息保持开放?您是不是在为一些消息创建消费者然后关闭它?仅关闭消费者会导致与消费者相关联的消息被放回到可能破坏订单的队列中。

Is it related to rollbacks? (Are you rolling back any transactions?).

它与回滚有关吗? (你回滚任何交易吗?)。

Finally, you can always ensure order by using a Resequencer to reorder things for you.

最后,您可以通过使用Resequencer为您重新排序来确保订单。