RabbitMQ 消费者操作类
发布日期:2021-05-14 17:45:24 浏览次数:23 分类:精选文章

本文共 2861 字,大约阅读时间需要 9 分钟。

public class RabbitMqWorker{private readonly string rabbitMQ_HostConfig = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];private BackgroundWorker bg = new BackgroundWorker();private IConnection rabbitMqConnection;

public RabbitMqWorker(){    string[] rabbitConfig = rabbitMQ_HostConfig.Split(',');    rabbitMqConnection = RabbitMQHelper.Instance(rabbitConfig[0], int.Parse(rabbitConfig[1]),                                               rabbitConfig[2], rabbitConfig[3], rabbitConfig[4]).Connection();}public void Start(){    Program.loggor.Info("RabbitMqWorker initialized");    bg.DoWork += new DoWorkEventHandler(bg_DoWork);    bg.WorkerSupportsCancellation = true;    bg.RunWorkerCompleted += new RunWorkerCompletedEventHandler(bg_RunWorkerCompleted);    RunBackgroundTask();}public void Stop(){    if (rabbitMqConnection != null)    {        rabbitMqConnection.Close();    }    if (bg != null && bg.IsBusy)    {        bg.CancelAsync();    }}private void RunBackgroundTask(){    Program.loggor.Info("Starting background worker");    bg.RunWorkerAsync();}private void bg_DoWork(object sender, DoWorkEventArgs e){    try    {        Program.loggor.Info("Processing messages");        System.Threading.Thread.Sleep(1000);        Parallel.Invoke(            () => MessageConsumer("queue.sp.mipo.message"),            () => MessageConsumer("queue.sp.mipo.message"),            () => MessageConsumer("queue.sp.mipo.message"),            () => MessageConsumer("queue.sp.mipo.message")        );    }    catch (Exception ex)    {        Program.loggor.Error("RabbitMqWorker", ex);        EMailHelper.ToMail(ex);    }}private void bg_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e){    if (e.Error != null)    {        Program.loggor.Error("RabbitMqWorker", e.Error);    }}private void MessageConsumer(string queueName){    using (IModel channel = rabbitMqConnection.CreateModel())    {        Dictionary
args = new Dictionary
(); args.Add("x-dead-letter-exchange", "exchange.dlx"); args.Add("x-dead-letter-routing-key", "rk.sp.mipo"); if (channel.QueueDeclare(queueName, true, false, false, args)) { channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, false, consumer); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); if (RabbitConsumeBll.Instance().Bus(message)) { channel.BasicAck(ea.DeliveryTag, false); } else { channel.BasicReject(ea.DeliveryTag, false); } } } }}

}

上一篇:RabbitMQ配置
下一篇:RabbitMQ 消息生产者操作类

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2025年04月19日 12时05分03秒