
实现延迟消息队列
BaseQueue BaseTask
发布日期:2021-05-09 04:06:25
浏览次数:9
分类:博客文章
本文共 9017 字,大约阅读时间需要 30 分钟。
- 交流个人博客交流群:580749909 , 顺便推广一下自己和伙伴一起建立wpf交流群:130108655。
- 简要
因为在偶然的一次机会下,公司让我着手开发一个数据分发端基于socket通讯的一个中间件。主要用来解决向客户端分发数据的问题,后来多了一个需求就是未付费的用户拿到的数据是有延迟的。
而付费用户则是正常的。这个时候在网上搜了很久没有找到合适的解决方案,其实能解决这个问题的方案有很多比如说用到一些大厂贡献的xxMQ中间件之类的,确实能解决问题。但是目前项目比较小
根本用不上这么重的框架,然后又搜索了半天没有暂时没有发现有人用c#来实现,所以才动手写了这个方案。
- 思路
这个方案是借鉴了另一位博主的开发思路,受到这位博主的启发然后根据自己的理解写了这个方案。附上该博主的链接地址:
在此我就不多赘述里面的内容了。
- 代码
首先写一个方案要理清楚自己的项目结构,我做了如下分层。
- Interfaces , 这层里主要约束延迟消息队列的队列和消息任务行。
1 public interface IRingQueue2 { 3 /// 4 /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles] 5 /// 6 /// The specified task is executed after N seconds. 7 /// Definitions of callback 8 void Add(long delayTime,Actionaction); 9 10 /// 11 /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]12 /// 13 /// The specified task is executed after N seconds.14 /// Definitions of callback.15 /// Parameters used in the callback function.16 void Add(long delayTime, Actionaction, T data);17 18 /// 19 /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]20 /// 21 /// 22 /// Definitions of callback23 /// Parameters used in the callback function.24 /// Task ID, used when deleting tasks.25 void Add(long delayTime, Actionaction, T data, long id);26 27 /// 28 /// Remove tasks [need to know: where the task is, which specific task].29 /// 30 /// Task slot location31 /// Task ID, used when deleting tasks.32 void Remove(long id);33 34 ///35 /// Launch queue.36 /// 37 void Start();38 }
1 public interface ITask2 {3 }
- Achieves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。
1 public abstract class BaseQueue: IRingQueue 2 { 3 private long _pointer = 0L; 4 private ConcurrentBag >[] _arraySlot; 5 private int ArrayMax; 6 7 /// 8 /// Ring queue. 9 /// 10 public ConcurrentBag>[] ArraySlot 11 { 12 get { return _arraySlot ?? (_arraySlot = new ConcurrentBag >[ArrayMax]); } 13 } 14 15 public BaseQueue(int arrayMax) 16 { 17 if (arrayMax < 60 && arrayMax % 60 == 0) 18 throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 ."); 19 20 ArrayMax = arrayMax; 21 } 22 23 public void Add(long delayTime, Action action) 24 { 25 Add(delayTime, action, default(T)); 26 } 27 28 public void Add(long delayTime,Action action,T data) 29 { 30 Add(delayTime, action, data,0); 31 } 32 33 public void Add(long delayTime, Action action, T data,long id) 34 { 35 NextSlot(delayTime, out long cycle, out long pointer); 36 ArraySlot[pointer] = ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag >()); 37 var baseTask = new BaseTask (cycle, action, data,id); 38 ArraySlot[pointer].Add(baseTask); 39 } 40 41 /// 42 /// Remove tasks based on ID. 43 /// 44 /// 45 public void Remove(long id) 46 { 47 try 48 { 49 Parallel.ForEach(ArraySlot, (ConcurrentBag> collection, ParallelLoopState state) => 50 { 51 var resulTask = collection.FirstOrDefault(p => p.Id == id); 52 if (resulTask != null) 53 { 54 collection.TryTake(out resulTask); 55 state.Break(); 56 } 57 }); 58 } 59 catch (Exception e) 60 { 61 Console.WriteLine(e); 62 } 63 } 64 65 public void Start() 66 { 67 while (true) 68 { 69 RightMovePointer(); 70 Thread.Sleep(1000); 71 Console.WriteLine(DateTime.Now.ToString()); 72 } 73 } 74 75 /// 76 /// Calculate the information of the next slot. 77 /// 78 /// Delayed execution time. 79 /// Number of turns. 80 /// Task location. 81 private void NextSlot(long delayTime, out long cycle,out long index) 82 { 83 try 84 { 85 var circle = delayTime / ArrayMax; 86 var second = delayTime % ArrayMax; 87 var current_pointer = GetPointer(); 88 var queue_index = 0L; 89 90 if (delayTime - ArrayMax > ArrayMax) 91 { 92 circle = 1; 93 } 94 else if (second > ArrayMax) 95 { 96 circle += 1; 97 } 98 99 if (delayTime - circle * ArrayMax < ArrayMax)100 {101 second = delayTime - circle * ArrayMax;102 }103 104 if (current_pointer + delayTime >= ArrayMax)105 {106 cycle = (int)((current_pointer + delayTime) / ArrayMax);107 if (current_pointer + second - ArrayMax < 0)108 {109 queue_index = current_pointer + second;110 }111 else if (current_pointer + second - ArrayMax > 0)112 {113 queue_index = current_pointer + second - ArrayMax;114 }115 }116 else117 {118 cycle = 0;119 queue_index = current_pointer + second;120 }121 index = queue_index;122 }123 catch (Exception e)124 {125 Console.WriteLine(e);126 throw;127 }128 }129 130 ///131 /// Get the current location of the pointer.132 /// 133 ///134 private long GetPointer()135 {136 return Interlocked.Read(ref _pointer);137 }138 139 /// 140 /// Reset pointer position.141 /// 142 private void ReSetPointer()143 {144 Interlocked.Exchange(ref _pointer, 0);145 }146 147 ///148 /// Pointer moves clockwise.149 /// 150 private void RightMovePointer()151 {152 try153 {154 if (GetPointer() >= ArrayMax - 1)155 {156 ReSetPointer();157 }158 else159 {160 Interlocked.Increment(ref _pointer);161 }162 163 var pointer = GetPointer();164 var taskCollection = ArraySlot[pointer];165 if (taskCollection == null || taskCollection.Count == 0) return;166 167 Parallel.ForEach(taskCollection, (BaseTasktask) =>168 {169 if (task.Cycle > 0)170 {171 task.SubCycleNumber();172 }173 174 if (task.Cycle <= 0)175 {176 taskCollection.TryTake(out task);177 task.TaskAction(task.Data);178 }179 });180 }181 catch (Exception e)182 {183 Console.WriteLine(e);184 throw;185 }186 }187 }
1 public class BaseTask: ITask 2 { 3 private long _cycle; 4 private long _id; 5 private T _data; 6 7 public Action TaskAction { get; set; } 8 9 public long Cycle10 {11 get { return Interlocked.Read(ref _cycle); }12 set { Interlocked.Exchange(ref _cycle, value); }13 }14 15 public long Id16 {17 get { return _id; }18 set { _id = value; }19 }20 21 public T Data22 {23 get { return _data; }24 set { _data = value; }25 }26 27 public BaseTask(long cycle, Action action, T data,long id)28 {29 Cycle = cycle;30 TaskAction = action;31 Data = data;32 Id = id;33 }34 35 public BaseTask(long cycle, Action action,T data)36 {37 Cycle = cycle;38 TaskAction = action;39 Data = data;40 }41 42 public BaseTask(long cycle, Action action)43 {44 Cycle = cycle;45 TaskAction = action;46 }47 48 public void SubCycleNumber()49 {50 Interlocked.Decrement(ref _cycle);51 }52 }
- Logic,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。
1 public static void Start() 2 { 3 //1.Initialize queues of different granularity. 4 IRingQueueminuteRingQueue = new MinuteQueue (); 5 6 //2.Open thread. 7 var lstTasks = new List 8 { 9 Task.Factory.StartNew(minuteRingQueue.Start)10 };11 12 //3.Add tasks performed in different periods.13 minuteRingQueue.Add(5, new Action ((NewsModel newsObj) =>14 {15 Console.WriteLine(newsObj.News);16 }), new NewsModel() { News = "Trump's visit to China!" });17 18 minuteRingQueue.Add(10, new Action ((NewsModel newsObj) =>19 {20 Console.WriteLine(newsObj.News);21 }), new NewsModel() { News = "Putin Pu's visit to China!" });22 23 minuteRingQueue.Add(60, new Action ((NewsModel newsObj) =>24 {25 Console.WriteLine(newsObj.News);26 }), new NewsModel() { News = "Eisenhower's visit to China!" });27 28 minuteRingQueue.Add(120, new Action ((NewsModel newsObj) =>29 {30 Console.WriteLine(newsObj.News);31 }), new NewsModel() { News = "Xi Jinping's visit to the US!" });32 33 //3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.34 //F5 Run the program and see the effect.35 Task.WaitAll(lstTasks.ToArray());36 Console.Read();37 }
- Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。
- 截图
发表评论
最新留言
路过,博主的博客真漂亮。。
[***.116.15.85]2025年04月03日 18时53分24秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
PostgreSQL查询表名称及表结构
2021-05-09
如何使用google搜索?
2021-05-09
Redis分布式锁的正确实现方式
2021-05-09
设计模式-抽象工厂模式
2021-05-09
IntelliJ IDEA 中,项目文件右键菜单没有svn选项解决办法
2021-05-09
IDEA 调试Java代码的两个技巧
2021-05-09
重新温习软件设计之路(4)
2021-05-09
MySQL数据库与python交互
2021-05-09
python如何对字符串进行html转义与反转义?
2021-05-09
开发小白也毫无压力的hexo静态博客建站全攻略 - 躺坑后亲诉心路历程
2021-05-09
golang基础--类型与变量
2021-05-09
深入理解JavaScript函数
2021-05-09
【spring源码系列】之【xml解析】
2021-05-09
(在模仿中精进数据可视化07)星球研究所大坝分布可视化
2021-05-09
(数据科学学习手札27)sklearn数据集分割方法汇总
2021-05-09
(数据科学学习手札40)tensorflow实现LSTM时间序列预测
2021-05-09
8 个警示和学习的 5 个阶段
2021-05-09
从零开始学安全(十六)● Linux vim命令
2021-05-09
阿里巴巴Json工具-Fastjson教程
2021-05-09