
RabbitMQ 消费者操作类
发布日期:2021-05-14 17:45:24
浏览次数:23
分类:精选文章
本文共 2861 字,大约阅读时间需要 9 分钟。
public class RabbitMqWorker{private readonly string rabbitMQ_HostConfig = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];private BackgroundWorker bg = new BackgroundWorker();private IConnection rabbitMqConnection;
public RabbitMqWorker(){ string[] rabbitConfig = rabbitMQ_HostConfig.Split(','); rabbitMqConnection = RabbitMQHelper.Instance(rabbitConfig[0], int.Parse(rabbitConfig[1]), rabbitConfig[2], rabbitConfig[3], rabbitConfig[4]).Connection();}public void Start(){ Program.loggor.Info("RabbitMqWorker initialized"); bg.DoWork += new DoWorkEventHandler(bg_DoWork); bg.WorkerSupportsCancellation = true; bg.RunWorkerCompleted += new RunWorkerCompletedEventHandler(bg_RunWorkerCompleted); RunBackgroundTask();}public void Stop(){ if (rabbitMqConnection != null) { rabbitMqConnection.Close(); } if (bg != null && bg.IsBusy) { bg.CancelAsync(); }}private void RunBackgroundTask(){ Program.loggor.Info("Starting background worker"); bg.RunWorkerAsync();}private void bg_DoWork(object sender, DoWorkEventArgs e){ try { Program.loggor.Info("Processing messages"); System.Threading.Thread.Sleep(1000); Parallel.Invoke( () => MessageConsumer("queue.sp.mipo.message"), () => MessageConsumer("queue.sp.mipo.message"), () => MessageConsumer("queue.sp.mipo.message"), () => MessageConsumer("queue.sp.mipo.message") ); } catch (Exception ex) { Program.loggor.Error("RabbitMqWorker", ex); EMailHelper.ToMail(ex); }}private void bg_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e){ if (e.Error != null) { Program.loggor.Error("RabbitMqWorker", e.Error); }}private void MessageConsumer(string queueName){ using (IModel channel = rabbitMqConnection.CreateModel()) { Dictionaryargs = new Dictionary (); args.Add("x-dead-letter-exchange", "exchange.dlx"); args.Add("x-dead-letter-routing-key", "rk.sp.mipo"); if (channel.QueueDeclare(queueName, true, false, false, args)) { channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, false, consumer); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); if (RabbitConsumeBll.Instance().Bus(message)) { channel.BasicAck(ea.DeliveryTag, false); } else { channel.BasicReject(ea.DeliveryTag, false); } } } }}
}
发表评论
最新留言
做的很好,不错不错
[***.243.131.199]2025年04月19日 12时05分03秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
http头部 Expect
2021-05-09
Hadoop(十六)之使用Combiner优化MapReduce
2021-05-09
《机器学习Python实现_10_06_集成学习_boosting_gbdt分类实现》
2021-05-09
CoreCLR源码探索(八) JIT的工作原理(详解篇)
2021-05-09
IOS开发Swift笔记16-错误处理
2021-05-10
flume使用中的一些常见错误解决办法 (地址已经使用)
2021-05-10
andriod 开发错误记录
2021-05-10
C语言编译错误列表
2021-05-10
看明白这两种情况,才敢说自己懂跨链! | 喵懂区块链24期
2021-05-10
张一鸣:创业7年,我经历的5件事
2021-05-10
git拉取远程指定分支代码
2021-05-10
《web安全入门》(四)前端开发基础Javascript
2021-05-10
python中列表 元组 字典 集合的区别
2021-05-10
python struct 官方文档
2021-05-10
Android DEX加固方案与原理
2021-05-10
Android Retrofit2.0 上传单张图片和多张图片
2021-05-10
iOS_Runtime3_动态添加方法
2021-05-10
Leetcode第557题---翻转字符串中的单词
2021-05-10