RabbitMQ 一个demo

时间:2023-03-09 17:04:23
RabbitMQ 一个demo

Code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.IO; namespace ConsoleDemo
{
class Program
{
static void Main(string[] args)
{
//step1
new Thread(Provider.Write).Start();
new Thread(Provider.Write).Start();
new Thread(Provider.Write).Start(); //step2(写入不能多线程,会冲突)
new Thread(Consumer.Read).Start();
}
} public class Provider
{
public static void Write()
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "qhong", Password = "hongdada", };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
for (int i = ; i < ; i++)
{
string message = i.ToString();
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
Console.WriteLine("Program Sent {0}", message);
}
}
}
} public class Consumer
{
public static void Read()
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "qhong", Password = "hongdada", VirtualHost = "/" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "writeLog",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
ExcuateWriteFile(message);
Console.WriteLine(" Receiver Received {0}", message);
};
channel.BasicConsume(queue: "writeLog",
noAck: true,
consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
public static void ExcuateWriteFile(string i)
{
using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append))
{
using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
{
sw.WriteLine(i);
}
}
}
}
}

分成2步执行,第一步生产者往队列里面添加数据

第二步,消费者读取队列里面的数据并写入文件test.txt

http://www.cnblogs.com/ericli-ericli/p/5917018.html

http://www.cnblogs.com/piaolingzxh/p/5448927.html