.Net Core 3.1 WebAPI OR Mvc 中使用 RabbitMQ

时间:2024-01-26 22:04:35

 

首先创建一个 数据链接工作单元 

发送端和接收端都可以使用 

MessageMQUserName MessageMQPassword MessageMQHostName MessageMQQueueName 对应自己的rabbit服务器的username passward 端口 以及队列名称
  public class RabbitMQClientUnit : RabbitMQClientIUnit
    {
        private readonly IConfiguration _configuration;
        public RabbitMQClientUnit(IConfiguration configuration)
        {
            _configuration = configuration;

            ConnectionFactory factory = new ConnectionFactory
            {
UserName
= _configuration.GetConnectionString("MessageMQUserName"), Password = _configuration.GetConnectionString("MessageMQPassword"), HostName = _configuration.GetConnectionString("MessageMQHostName"), }; Connection = factory.CreateConnection(); QueueName = _configuration.GetConnectionString("MessageMQQueueName"); Channel = Connection.CreateModel(); } public IConnection Connection { get; } public IModel Channel { get; } public string QueueName { get; } }

在MessageRepository类中使用以下方法

  private readonly RabbitMQClientIUnit _rabbitMQClientIUnit;
        private readonly IConfiguration _configuration;

        public MessageRepository(RabbitMQClientIUnit rabbitMQClientIUnit, IConfiguration configuration)
        {
            _rabbitMQClientIUnit = rabbitMQClientIUnit;
            _configuration = configuration;
      
        }
        /// <summary>
        /// MQ下发消息
        /// </summary>
        /// <param name="encryption"></param>
        public void RabbitMQPush(string encryption)
        {
            try
            {
                _rabbitMQClientIUnit.Channel.QueueDeclare(_rabbitMQClientIUnit.QueueName, false, false, false, null);
                var sendBytes = Encoding.UTF8.GetBytes(encryption);
                //发布消息
                _rabbitMQClientIUnit.Channel.BasicPublish("", _configuration.GetConnectionString("MessageExchange"), null, sendBytes);
                _rabbitMQClientIUnit.Channel.Close();
            }
            catch
            {
                throw new ArgumentException("出现异常MQ推送失败");
            }
        }

这样就完成发送端RabbitMQ的编写

 

接收端稍微有些麻烦  在Core3.1中我也走了一些弯路 一开始想用控制台程序做接收端 但是在linux下面 无法使用console.key 使用console.key会导致启动服务出错

所以只能回归到Core webapi or mvc上面了

编写RabbitListener类代码如下

  public class RabbitListener : IHostedService
    {
        //private readonly IConnection connection;
        //private readonly IModel channel;

        private readonly RabbitMQClientIUnit  _rabbitMQClientIUnit;
        private readonly MessageIService _messageIService;
        public RabbitListener(RabbitMQClientIUnit rabbitMQClientIUnit, MessageIService messageIService)
        {
            _rabbitMQClientIUnit = rabbitMQClientIUnit;
            _messageIService = messageIService;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }


        // 处理消息的方法
        public virtual bool Process(string message)
        {
            throw new NotImplementedException();
        }

        // 注册消费者监听在这里
        public void Register()
        {

            EventingBasicConsumer consumer = new EventingBasicConsumer(_rabbitMQClientIUnit.Channel);
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
          //切记在.net core 3.1中无法直接使用ea.Body
var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到消息: {message}"); }; _rabbitMQClientIUnit.Channel.BasicConsume("Message", false, consumer); } public void DeRegister() { _rabbitMQClientIUnit.Connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { _rabbitMQClientIUnit.Connection.Close(); return Task.CompletedTask; } }

最后一步也是整个接收端的核心注入

在Startup中要使用AddHostedService方法注入

 

  services.AddHostedService<RabbitListener>();