MSMQ消息队列

时间:2021-11-21 14:59:44

MSMQ全称MicroSoft Message Queue,微软消息队列,是在多个不同的应用之间实现相互通信的一种异步传输模式,相互通信的应用可以分布于同一台机器上,也可以分布于相连的网络空间中的任一位置。它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中;本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理。

消息Message是由通信的双方所需要传递的信息。

队列的类型主要包括一下几种:

“公共队列”在整个“消息队列”网络中复制,并且有可能由网络连接的所有站点访问。

“专用队列”不在整个网络中发布。相反,它们仅在所驻留的本地计算机上可用。专用队列只能由知道队列的完整路径名或标签的应用程序访问。

“管理队列”包含确认在给定“消息队列”网络中发送的消息回执的消息。指定希望 MessageQueue 组件使用的管理队列(如果有的话)。

“响应队列”包含目标应用程序接收到消息时返回给发送应用程序的响应消息。指定希望 MessageQueue 组件使用的响应队列(如果有的话)。

优点:稳定、消息优先级、脱机能力以及安全性,有保障的消息传递和执行许多业务处理的可靠的防故障机制。

缺点:MSMQ不适合于Client需要Server端实时交互情况.大量请求时候,响应延迟.

.NET编程

、命名空间 using System.Messaging;

、默认存储路径 C:\WINDOWS\system32\msmq\storage

、创建消息队列:MessageQueue mq = MessageQueue.Create(@".\Private$\LeeMSMQ");

、删除队列:MessageQueue.Delete(@".\Private$\LeeMSMQ");

、发送消息:MessageQueue mq = new MessageQueue(@".\Private$\LeeMSMQ");

            mq.Send("sayhello1,hello msmq!", "sayhello1");

            mq.Send("sayhello2,hello msmq!", "sayhello2");

、接受并删除消息:MessageQueue mq = new MessageQueue(@".\Private$\LeeMSMQ")

            Message msg = mq.Receive();//引用的队列中可用的第一条消息

、接受但不删除消息:Message msg = mq.Peek();

、删除所有消息: Message msg = mq.Purge();

、返回本机所有私有队列的消息

      //返回本机所有私有队列的消息

             foreach (MessageQueue mq in MessageQueue.GetPrivateQueuesByMachine("liyanping"))

            {

                mq.Formatter = new XmlMessageFormatter(new string[] { "System.String" });

                Message[] msg = mq.GetAllMessages();

                foreach (Message m in msg)

                {

                    Console.WriteLine("label:{0},body:{1}", m.Label, m.Body);

                }

            }

、返回指定队列的消息

            if (MessageQueue.Exists(@".\Private$\LeeMSMQ"))//判断私有消息是否存在

            {

                using (MessageQueue mq = new MessageQueue(@".\Private$\LeeMSMQ"))

                {

                    mq.Formatter = new XmlMessageFormatter(new string[] { "System.String" });//设置消息队列格式化器

                    Message msg = mq.Receive();//接收消息

                    Console.WriteLine("label:{0},body: {1}", msg.Label, msg.Body);//输出消息

                    MessageQueue.Delete(@".\Private$\LeeMSMQ");

                }

            } 

下面开始消息队列实战开发

msmq客户端:包含了消息基类和发送消息send方法以及抽象的序列化数据的方法

 public class BaseMsg
{
public BaseMsg()
{
Recoverable = true;
PriorityLevel = MessagePriority.Normal;
} public bool Recoverable { get; set; } public MessagePriority PriorityLevel { get; set; }
}

然后实现client发送msmq及序列化消息的基类

  public abstract class MSMQClient<T>
where T : BaseMsg
{
private readonly string queueName;
protected MSMQClient(string appSettingName)
{
ConfigurationManager.RefreshSection("appSettings");
this.queueName = ConfigurationManager.AppSettings[appSettingName];
} /// <summary>
/// 添加短信到消息队列
/// </summary>
/// <param name="entity"></param>
public void Send(T entity)
{
if (entity == null)
throw new ArgumentNullException("发送对象不能为空");
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("队列名称不能为空");
} var msg = Serialize(entity);
var queue = new MessageQueue(queueName);
queue.Send(msg);
} protected abstract Message Serialize(T msg);
}

msmq服务端:消息处理的公共接口、Msmq监听

 public interface IMessageProcessor
{
/// <summary>
/// 处理方法
/// </summary>
/// <param name="o"></param>
void Process(object o);
}

Msmq的监听

 class MSMQListener
{
private readonly MessageQueue queue;
private readonly WaitHandle[] waiteHandle = new WaitHandle[];
public event MessageReceivedEventHandler MessageReceived; /// <summary>
/// 构造函数
/// </summary>
/// <param name="queuePath">要监控队列名称</param>
public MSMQListener(string queuePath)
{
queue = new MessageQueue(queuePath);
} /// <summary>
/// 开启监控
/// </summary>
public void Start()
{
queue.ReceiveCompleted += OnReceiveCompleted;
StartListening();
} /// <summary>
/// 关闭监控
/// </summary>
public void Stop()
{
queue.ReceiveCompleted -= OnReceiveCompleted;
StopListening();
} private void StartListening()
{
waiteHandle[] = queue.BeginReceive().AsyncWaitHandle;
} private void StopListening()
{
try
{
waiteHandle[].Close();
}
catch (Exception ex)
{
Log.Error("停止监控信号量异常1", ex);
}
} private void FireRecieveEvent(object body)
{
if (MessageReceived != null)
{
MessageReceived(this, new MessageEventArgs(body));
}
} private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
try
{
var msg = queue.EndReceive(e.AsyncResult);
msg.Formatter = new ActiveXMessageFormatter(); StartListening(); FireRecieveEvent(msg.Body);
}
catch (Exception ex)
{
Log.Error("OnReceive Error", ex);
}
}
} /// <summary>
/// 消息接收事件
/// </summary>
/// <param name="sender"></param>
/// <param name="args"></param>
public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args); /// <summary>
/// 消息参数
/// </summary>
public class MessageEventArgs : EventArgs
{
private readonly object messageBody; /// <summary>
/// 消息体
/// </summary>
public object MessageBody
{
get { return messageBody; }
} /// <summary>
/// 构造函数
/// </summary>
/// <param name="body">消息体</param>
public MessageEventArgs(object body)
{
messageBody = body;
}
}