C#代码实现阿里云消息服务MNS消息监听
发布日期:2021-05-09 01:18:07 浏览次数:18 分类:博客文章

本文共 23566 字,大约阅读时间需要 78 分钟。

十年河东,十年河西,莫欺少年穷

学无止境,精益求精

近几天一直都在看阿里云的IOT云服务及消息队列MNS,一头雾水好几天了,直到今天,总算有点收获了,记录下来,方便以后查阅。

首先借用阿里云的一张图来说明:设备是如何通过云服务平台和企业服务器‘通话的’

针对此图,作如下说明:

1、物联网平台作为中间组件,主要是通过消息队MNS列来实现设备和企业服务器对话的,具体可描述为:

1.1、设备发送指令至物联网平台的MNS队列,MNS队列将设备指令收录,需要说明的是:设备发送指令是通过嵌入式开发人员开发的,例如C语言

1.2、企业通过C#、JAVA、PHP等高级语言开发人员开发监听程序,当监听到MNS队列中的设备指令时,获取指令,做相关业务处理,并发送新的设备指令至MNS队列。【例如发送快递柜关门的指令】

1.3、企业发送的指令被MNS收录,设备同样通过监听程序获取企业服务器发送的关门指令,收到关门指令的设备执行相关指令,完成自动关门操作。

以上便是设备与企业服务器之间的对话过程

下面列出C#的监听MNS代码【需要MNS C# JDK 的支持】注意:消息是经过EncodeBase64编码,接受消息要解码,发送消息要编码

异步监听:

using System;using System.Threading;using System.Threading.Tasks;using Aliyun.MNS;using Aliyun.MNS.Model;using IotCommon;using IotDtos.MongodbDtos;using IotService.Device;using IotService.MongoDb;namespace IotListener{    class Program    {        private static MongoLogService _logService;        public static string _receiptHandle;        public static DeviceResponseService service = new DeviceResponseService();        public static Queue nativeQueue;        static void Main(string[] args)        {            LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") };            _logService = new MongoLogService(st);            while (true)            {                try                {                    IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);                    nativeQueue = client.GetNativeQueue(IotParm._queueName);                    for (int i = 0; i < IotParm._receiveTimes; i++)                    {                        ReceiveMessageRequest request = new ReceiveMessageRequest(1);                        nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);                        Thread.Sleep(1);                    }                }                catch (Exception ex)                {                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);                }            }        }        ///         /// 回调函数        ///         ///         public static void ListenerCallback(IAsyncResult ar)        {            try            {                Message message = nativeQueue.EndReceiveMessage(ar).Message;                string Json = Base64Helper.DecodeBase64(message.Body);                Console.WriteLine("Message: {0}", Json);                Console.WriteLine("----------------------------------------------------\n");                var methodValue = JsonKeyHelper.GetJsonValue(Json, "method");                DeviceResponse(methodValue, Json);                if (!string.IsNullOrEmpty(methodValue))                {                    _logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue });                }                _receiptHandle = message.ReceiptHandle;                nativeQueue.DeleteMessage(_receiptHandle);            }            catch (Exception ex)            {                Console.WriteLine("Receive message failed, exception info: " + ex.Message);            }        }        ///         /// 响应设备上传接口        ///         ///         ///         public static void DeviceResponse(string method, string message)        {            switch (method)            {                case "doorClosedReport": service.doorClosedReportResponse(message); break;                case "doorOpenReport": service.doorOpenReportResponse(message); break;                case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break;                case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break;                case "devicePingToCloud": service.devicePingToCloudResponse(message); break;                case "deviceFatalReport": service.deviceFatalReportResponse(message); break;                case "deviceVersionReport": service.deviceVersionReportResponse(message); break;                case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break;                case "deviceLocationReport": service.deviceLocationReportResponse(message); break;            }        }    }}
View Code

同步监听:

using Aliyun.MNS;using Aliyun.MNS.Model;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace MnsListener{    class Program    {        #region Private Properties        private const string _accessKeyId = "";        private const string _secretAccessKey = "";        private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";        private const string _stsToken = null;        private const string _queueName = "Sub";        private const string _queueNamePrefix = "my";        private const int _receiveTimes = 1;        private const int _receiveInterval = 2;        private const int batchSize = 6;        private static string _receiptHandle;        #endregion        static void Main(string[] args)        {            while (true)            {                try                {                    IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);                    var nativeQueue = client.GetNativeQueue(_queueName);                    for (int i = 0; i < _receiveTimes; i++)                    {                        var receiveMessageResponse = nativeQueue.ReceiveMessage(3);                        Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode);                        Console.WriteLine("----------------------------------------------------");                        Message message = receiveMessageResponse.Message;                        string s = DecodeBase64(message.Body);                        Console.WriteLine("MessageId: {0}", message.Id);                        Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle);                        Console.WriteLine("MessageBody: {0}", message.Body);                        Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5);                        Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime);                        Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime);                        Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime);                        Console.WriteLine("DequeueCount: {0}", message.DequeueCount);                        Console.WriteLine("Priority: {0}", message.Priority);                        Console.WriteLine("----------------------------------------------------\n");                        _receiptHandle = message.ReceiptHandle;                        nativeQueue.DeleteMessage(_receiptHandle);                        Thread.Sleep(_receiveInterval);                    }                }                catch (Exception ex)                {                    Console.WriteLine("Receive message failed, exception info: " + ex.Message);                }            }        }        ///编码        public static string EncodeBase64(string code, string code_type= "utf-8")        {            string encode = "";            byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);            try            {                encode = Convert.ToBase64String(bytes);            }            catch            {                encode = code;            }            return encode;        }        ///解码        public static string DecodeBase64(string code, string code_type = "utf-8")        {            string decode = "";            byte[] bytes = Convert.FromBase64String(code);            try            {                decode = Encoding.GetEncoding(code_type).GetString(bytes);            }            catch            {                decode = code;            }            return decode;        }    }}
View Code

发送消息:

using Aliyun.MNS;using Aliyun.MNS.Model;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace MnsSendMsg{    class Program    {        #region Private Properties        private const string _accessKeyId = "";        private const string _secretAccessKey = "";        private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";        private const string _stsToken = null;        private const string _queueName = "Sub";        private const string _queueNamePrefix = "my";        private const int _receiveTimes = 1;        private const int _receiveInterval = 2;        private const int batchSize = 6;        private static string _receiptHandle;        #endregion        static void Main(string[] args)        {            try            {                IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);                // 1. 获取Queue的实例                var nativeQueue = client.GetNativeQueue(_queueName);                var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里云
计算")); sendMessageRequest.DelaySeconds = 2; var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest); Console.WriteLine("Send message successfully,{0}", sendMessageResponse.ToString()); Thread.Sleep(2000); } catch (Exception ex) { Console.WriteLine("Send message failed, exception info: " + ex.Message); } } ///编码 public static string EncodeBase64(string code, string code_type = "utf-8") { string encode = ""; byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code); try { encode = Convert.ToBase64String(bytes); } catch { encode = code; } return encode; } ///解码 public static string DecodeBase64(string code, string code_type = "utf-8") { string decode = ""; byte[] bytes = Convert.FromBase64String(code); try { decode = Encoding.GetEncoding(code_type).GetString(bytes); } catch { decode = code; } return decode; } }}
View Code

关于MNS C# JDK下载,可以去阿里云:

关于MNS队列,主题,主题订阅相关知识:

关于阿里云AMQP队列接入,可以查询:

关于阿里云物联网平台,请查阅:

最后:

高级异步侦听

using Aliyun.Acs.Core;using Aliyun.Acs.Core.Exceptions;using Aliyun.Acs.Core.Profile;using Aliyun.Acs.Iot.Model.V20180120;using Aliyun.MNS;using Aliyun.MNS.Model;using Iot.Common;using Iot.Dal.WA_Device;using Iot.Factory;using Iot.Model.WA_Device;using System;using System.Collections.Generic;using System.Configuration;using System.Linq;using System.Reflection;using System.Security.Cryptography;using System.Text;using System.Threading;using System.Threading.Tasks;namespace WaIotListener{    class Program    {        static IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey);        static string _receiptHandle;        static Queue nativeQueue;        static string Key = CommEnum.DeviceTypeEnm.WA.ToString();        static string Methods = ConfigurationManager.AppSettings["Methods"];        static ReceiveMessageRequest request = new ReceiveMessageRequest(1);        static List
lst = new List
(); static void Main(string[] args) { try { lst = Methods.Split('_').ToList(); IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken); nativeQueue = client.GetNativeQueue(IotParm.WA_queueName); Task.Run(delegate { nativeQueue.BeginReceiveMessage(request, ListenerCallback, null); }); } catch (Exception ex) { Console.WriteLine("Receive message failed"); } Console.ReadKey(); } public static void ListenerCallback(IAsyncResult ar) { try { Message message = nativeQueue.EndReceiveMessage(ar).Message; var msg = message.Body; msg = Base64Helper.DecodeBase64(msg); var payload = JsonKeyHelper.GetJsonValue(msg, "payload"); msg = Base64Helper.DecodeBase64(payload); var methodValue = JsonKeyHelper.GetJsonValue(msg, "cmd"); if (!string.IsNullOrEmpty(methodValue)) { Console.WriteLine(msg); Task.Run(delegate { DeviceResponse(methodValue, msg); }); Task.Run(delegate { DeviceToDb(methodValue, msg); }); } if (lst.Contains(methodValue)) { WA_Door.ReceiveDeviceResponse(methodValue, msg); } _receiptHandle = message.ReceiptHandle; nativeQueue.DeleteMessage(_receiptHandle); } catch (Exception ex) { if(ex.Message.Trim() == "Message not exist.") { Console.WriteLine("Receive message failed"); } else { LogHelper.WriteLog("系统异常:", ex); } } finally { nativeQueue.BeginReceiveMessage(request, ListenerCallback, null); } } ///
/// 响应设备上传接口 /// ///
///
public static void DeviceResponse(string method, string message) { string result = string.Empty; var DeviceResponseBll = DeviceResponseFactory.GetTarget(Key); switch (method) { case "close": result = DeviceResponseBll.doorClosedReportResponse(message); break;//响应关门上报 case "login": result = DeviceResponseBll.deviceStartReportToCloudResponse(message); break;//响应开机上报 case "alm": result = DeviceResponseBll.deviceFatalReportResponse(message); break;//响应仓位预警上报 case "allst": result = DeviceResponseBll.deviceVersionReportResponse(message); break;//心跳包 响应仓位状态上报 } if (!string.IsNullOrEmpty(result)) { SendMessage(result); } } public static void DeviceToDb(string method, string message) { var DeviceResponseBll = DeviceResponseToDBFactory.GetTarget(Key); switch (method) { case "open": DeviceResponseBll.doorOpenToDb(message); break; //case "stopchg": DeviceResponseBll.doorOpenToDb(message); break; //case "chg": DeviceResponseBll.doorOpenToDb(message); break; case "close": DeviceResponseBll.doorClosedToDb(message); break;//关门上报 case "login": DeviceResponseBll.deviceStartToDb(message); break;//开机上报 case "allst": DeviceResponseBll.devicePingToCloudToDb(message); break;//仓位状态上报 心跳包 case "alm": DeviceResponseBll.deviceFatalToDb(message); break;//仓位预警上报 } } public static void SendMessage(string Json) { try { var DeviceNum = JsonKeyHelper.GetJsonValue(Json, "id"); //Aliyun.Acs.Core.Profile.DefaultProfile.GetProfile().AddEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com"); // DefaultAcsClient client = new DefaultAcsClient(clientProfile); PubRequest request = new PubRequest(); request.ProductKey = IotParm.WA_ProductKey; request.TopicFullName = "/" + IotParm.WA_ProductKey + "/"+ DeviceNum + "/user/get"; byte[] payload = Encoding.Default.GetBytes(Json); String payloadStr = Convert.ToBase64String(payload); request.MessageContent = payloadStr; request.Qos = 0; try { PubResponse response = client.GetAcsResponse(request); Console.WriteLine("publish message result: " + response.Success); Console.WriteLine(response.ErrorMessage); } catch (ServerException ex) { Console.WriteLine(ex.ErrorCode); Console.WriteLine(ex.ErrorMessage); } } catch (ClientException ex) { Console.WriteLine(ex.ErrorCode); Console.WriteLine(ex.ErrorMessage); } } }}
View Code

发送消息及局部指令处理【用于返回Api结果】

using Aliyun.Acs.Core;using Aliyun.Acs.Core.Exceptions;using Aliyun.Acs.Core.Http;using Aliyun.Acs.Core.Profile;using Aliyun.Acs.Iot.Model.V20180120;using Aliyun.MNS;using Iot.Common;using Iot.Dal.Pagination;using Iot.Model;using Iot.Model.Pagination;using Iot.Model.WA_Device;using MongoDB.Driver;using Newtonsoft.Json;using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace Iot.Dal.WA_Device{    public class WA_Door    {        ///         /// 等待时间        ///         public static int Tim = 30;        ///         /// 查询MongoDb 电柜门状态 带有分页        ///         /// 
public static List
QueryDeviceResponse(string deviceNum,string tid,string method, ref BasePaginationModel Pagination) { try { List
resList = new List
(); string DbName = "LogstoreDb" + "_" + DateTime.Now.ToString("yyyyMMdd"); LogstoreDatabaseSettings settings = new LogstoreDatabaseSettings() { LogsCollectionName = "WA_" + CommEnum.MongoDbnameEnm.DeviceResponse.ToString(), DatabaseName = DbName }; var client = new MongoClient(settings.ConnectionString); var database = client.GetDatabase(settings.DatabaseName); var Mongo = database.GetCollection
(settings.LogsCollectionName); List
> SearchList = new List
>(); var builder = Builders
.Filter; if (!string.IsNullOrEmpty(deviceNum)) { var devicefilter = builder.Eq("DeviceNum", deviceNum); SearchList.Add(devicefilter); } if (!string.IsNullOrEmpty(tid)) { var tidfilter = builder.Eq("tid", tid); SearchList.Add(tidfilter); } //查询十分钟的数据 var bDate = Convert.ToDateTime(DateTime.Now).AddHours(-8).AddMinutes(-10); //约束条件 DateTime startTime = new DateTime(bDate.Year, bDate.Month, bDate.Day, bDate.Hour, bDate.Minute, bDate.Second, DateTimeKind.Utc); //大于等于 var filter = builder.Gte("CreateTime", startTime); SearchList.Add(filter); //结束时间查询 var eDate = Convert.ToDateTime(DateTime.Now).AddHours(-8); //约束条件 DateTime endTime = new DateTime(eDate.Year, eDate.Month, eDate.Day, eDate.Hour, eDate.Minute, eDate.Second, DateTimeKind.Utc); //小于等于 var efilter = builder.Lte("CreateTime", endTime); SearchList.Add(efilter); var MongoR = Mongo.Find(Builders
.Filter.And(SearchList)).ToList(); var mongorResult = MongoR.AsQueryable
().Where(A => A.DeviceNum == deviceNum).OrderByDescending(item => item.CreateTime); var result = MongoPaginationService.BasePager
(mongorResult, ref Pagination); foreach (var item in result.ToList()) { resList.Add(new MsgModel { DeviceNum = item.DeviceNum, tid = item.tid, message = item.message, Method = item.Method }); } return resList; } catch(Exception ex) { LogHelper.WriteLog(ex.ToString()); } return null; } ///
/// 局部侦听 /// ///
方法 ///
设备号 ///
通讯ID ///
查询仓位状态专属 ///
public static string GetDeviceResponse(string method, string deviceNum,long tid,int dr=1) { BasePaginationModel pagination = new BasePaginationModel(); pagination = new BasePaginationModel() { PageNumber = 1, PageSize = 1 }; if (method == "drsts" && dr == 0) { pagination = new BasePaginationModel() { PageNumber = 1, PageSize = 4 }; } List
DeviceResponseList = new List
(); var RequestMessageDat = new WA_Base
>(); var datList = new List
(); Stopwatch sw = new Stopwatch(); sw.Start(); string result = string.Empty; while (true) { TimeSpan ts2 = sw.Elapsed; try { if (dr == 0) { //查询所有仓位状态时 if (ts2.TotalSeconds > 45) { break; } } else { //默认值 if(ts2.TotalSeconds > Tim) { break; } } Thread.Sleep(500);//线程挂起 释放CPU资源 DeviceResponseList = QueryDeviceResponse(deviceNum, tid.ToString(), method, ref pagination); if (DeviceResponseList!=null&& DeviceResponseList.Count > 0) { string TxId = tid.ToString(); var MsgList = DeviceResponseList.Where(A => A.tid == TxId && A.Method == method && A.DeviceNum == deviceNum).ToList(); if (MsgList.Count == 1&&dr!=0) { result = MsgList.FirstOrDefault().message; break; } //特殊处理 查询设备仓位信息 当传值门号为0时。 if (MsgList.Count == 4 && dr == 0) { foreach (var item in MsgList) { var ResultDat = Newtonsoft.Json.JsonConvert.DeserializeObject
>>(item.message); //赋值最外层 WA_Base if (string.IsNullOrEmpty(RequestMessageDat.cmd)) { RequestMessageDat.cmd = ResultDat.cmd; RequestMessageDat.rst = ResultDat.rst; RequestMessageDat.id = ResultDat.id; RequestMessageDat.tid = tid; } // WA_doorportstatus mol = ResultDat.dat[0]; if (ResultDat.dat != null && ResultDat.dat.Count > 0) { datList.Add(mol); } if (datList.Count == 4) { RequestMessageDat.dat = datList; result = JsonConvert.SerializeObject(RequestMessageDat); break; } } } } } catch (Exception ex) { LogHelper.WriteLog(ex.ToString()); } } sw.Stop(); return result; } ///
/// 接收设备响应消息-放入缓存 /// ///
public static void ReceiveDeviceResponse(string Method,string result) { var Txid = JsonKeyHelper.GetJsonValue(result, "tid"); var DeviceNum = JsonKeyHelper.GetJsonValue(result, "id"); var model = new MsgModel { message = result, tid = Txid, DeviceNum = DeviceNum, Method = Method }; LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "WA_" + CommEnum.MongoDbnameEnm.DeviceResponse.ToString() }; var TbModel = new WA_DeviceResponseTodb { CreateTime = DateTime.Now, DeviceNum = model.DeviceNum, message = model.message, Method = model.Method, tid = model.tid, //data = result }; new MongoLogService
(st).Create(TbModel); } } public class MsgModel { public string tid { get; set; } public string message { get; set; } public string Method { get; set; } public string DeviceNum { get; set; } } public class WA_DeviceHelper { #region 下发设备指令,云端->设备,由设备订阅 static IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", IotParm._accessKeyId, IotParm._secretAccessKey); public static PubResponse SendMsg(string DeviceNum, string json) { LogHelper.WriteLog("发送数据:" + json); PubResponse response = null; //DefaultProfile.GetProfile().AddEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com"); DefaultAcsClient client = new Aliyun.Acs.Core.DefaultAcsClient(clientProfile); PubRequest request = new PubRequest(); request.ProductKey = IotParm.WA_ProductKey; request.TopicFullName = "/" + IotParm.WA_ProductKey + "/" + DeviceNum + "/user/get"; byte[] payload = Encoding.Default.GetBytes(json); String payloadStr = Convert.ToBase64String(payload); request.MessageContent = payloadStr; request.Qos = 0; try { response = client.GetAcsResponse(request); } catch (ServerException ex) { LogHelper.WriteLog("服务端异常:" + ex.ToString()); throw ex; } catch (ClientException ex) { LogHelper.WriteLog("客户端异常:" + ex.ToString()); throw ex; } return response; } #endregion }}
View Code

@天才卧龙的博客

 

上一篇:NetCore3.1 使用 mongoDb 存储日志,提升查询效率
下一篇:消息队列 RocketMQ 并发量十万级

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年05月10日 13时15分36秒