
本文共 36354 字,大约阅读时间需要 121 分钟。
��������������������������� ���
���������������������������RabbitMQ������������������������������������������������������������������������������������������RabbitMQ���������������������������������������������������������������������RabbitMQ������������������������������������������������������������������������������������������������������������������������TCP������������������������������������������������������������ADO.NET���������������������������DB���������SQL SERVER���MYSQL���������������������������DB���������������������������������������using���������������������������������DB���������������������������������������������������������������������������������������������������������������������������DB������������������������������������������������������������������������MQ������������������MQ������RabbitMQ���������������������������������������������������������������������������������������������������������������������������������������������DB������������������������������MQ������������������������������������������������������������������������������������������DB���������MQ������ ������������������������TCP���������������TCP���������������������������������������������������������������������������������������������������TCP���������DB���������������MQ���������������������������������������������������������������������������������������DB���������������������������������������������������������������������������������������������������DB���������������������������������DB������������������������������������������������������������������������������������������������������������TCP���������������������MQ������������������������������TCP���������������������������������TCP���������������������������������������������������������������������������������������������������������������MQ���������������������������������������������������TCP������������������������������������������������������������������������������������������������������������������MQ������������������������������������������������������DB���������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
���������������������������������������������������������������������
������������������������MQHelper���������������������������������
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using RabbitMQ.Util;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Web.Caching;using System.Web;using System.Configuration;using System.IO;using System.Collections.Concurrent;using System.Threading;using System.Runtime.CompilerServices;namespace Zuowj.Core{ public class MQHelper { private const string CacheKey_MQConnectionSetting = "MQConnectionSetting"; private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount"; private readonly static ConcurrentQueueFreeConnectionQueue;//������������������������ private readonly static ConcurrentDictionary BusyConnectionDic;//������������������������������������ private readonly static ConcurrentDictionary MQConnectionPoolUsingDicNew;//������������������ private readonly static Semaphore MQConnectionPoolSemaphore; private readonly static object freeConnLock = new object(), addConnLock = new object(); private static int connCount = 0; public const int DefaultMaxConnectionCount = 30;//��������������������������������� public const int DefaultMaxConnectionUsingCount = 10000;//��������������������������������� private static int MaxConnectionCount { get { if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) { return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); } else { int mqMaxConnectionCount = 0; string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) { mqMaxConnectionCount = DefaultMaxConnectionCount; } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return mqMaxConnectionCount; } } } /// /// ������������ /// /// ��������������� /// ������������ /// ������������ ///private static ConnectionFactory CrateFactory() { var mqConnectionSetting = GetMQConnectionSetting(); var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConnectionSetting[0]; connectionfactory.UserName = mqConnectionSetting[1]; connectionfactory.Password = mqConnectionSetting[2]; if (mqConnectionSetting.Length > 3) //��������������� { connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]); } return connectionfactory; } private static string[] GetMQConnectionSetting() { string[] mqConnectionSetting = null; if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null) { //MQConnectionSetting=Host IP|;userid;|;password string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting]; if (!string.IsNullOrWhiteSpace(mqConnSettingStr)) { mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//������MQ������������������������������������������������������EncryptUtility���������AES��������������������������������������������������� if (mqConnSettingStr.Contains(";|;")) { mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries); } } if (mqConnectionSetting == null || mqConnectionSetting.Length < 3) { throw new Exception("MQConnectionSetting���������������������������"); } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath)); } else { mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[]; } return mqConnectionSetting; } public static IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//������������ var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } static MQHelper() { FreeConnectionQueue = new ConcurrentQueue (); BusyConnectionDic = new ConcurrentDictionary (); MQConnectionPoolUsingDicNew = new ConcurrentDictionary ();//������������������ MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//��������������������������������������������� } public static IConnection CreateMQConnectionInPoolNew() { SelectMQConnectionLine: MQConnectionPoolSemaphore.WaitOne();//��� DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //���������������������������������������������������������������������������,������������������������������������ { mqConnection.Close(); mqConnection.Dispose(); // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); } BusyConnectionDic[mqConnection] = true;//��������������������������� MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//���������������1 // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } private static void ResetMQConnectionToFree(IConnection connection) { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //��������������������� { // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } else { // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//������������������������������������������>MaxConnectionCount��������������������������� { connection.Close(); connection.Dispose(); } else { FreeConnectionQueue.Enqueue(connection);//������������������������������������������������������ } MQConnectionPoolSemaphore.Release();//������������������������������ //Interlocked.Decrement(ref connCount); //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount); } } /// /// ������������ /// /// ������������������������ ///������������ /// ������������ /// ��������������� /// ������ ///public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true) { try { using (var channel = connection.CreateModel())//������������������ { // ��������������������������������������������������������������������������������������������������������������������������������������� channel.QueueDeclare(queueName, durable, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1���������������,2.��������������� if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("", queueName, properties, body); } return string.Empty; } catch (Exception ex) { return ex.ToString(); } finally { ResetMQConnectionToFree(connection); } } /// /// ������������ /// /// ������������������������ /// ������������ /// ��������������� /// ������������������ /// ��������������������������� public static void ConsumeMsg(IConnection connection, string queueName, bool durable, FuncdealMessage, Action saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //������������ channel.BasicQos(0, 1, false); //������������������������ var consumer = new QueueingBasicConsumer(channel); //��������������� // ������������������������������������������������������������������������������������������������������ channel.BasicConsume(queueName, false, consumer); while (true) //������������������������ { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //������������ string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //������������������������ } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //������������������ } else { channel.BasicNack(ea.DeliveryTag, false, false); //������������������ } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { ResetMQConnectionToFree(connection); } } /// /// ������������������������ /// /// ������������������������ /// ������������ /// ��������� /// ������������������ public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, FuncdealMessage) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, durable, false, false, null); //������������ channel.BasicQos(0, 1, false); //������������������������ uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //��������������� // ������������������������������������������������������������������������������������������������������ channel.BasicConsume(QueueName, false, consumer); ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //������������ try { var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { throw ex; } finally { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //������������������������ } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //������������������ } else { channel.BasicNack(ea.DeliveryTag, false, false); //������������������ } } } else { dealMessage(string.Empty); } } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } } /// /// ��������������������� /// /// /// ///public static int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //������������ msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { throw ex; } finally { ResetMQConnectionToFree(connection); } return msgCount; } } public enum ConsumeAction { ACCEPT, // ������������ RETRY, // ��������������������������������������������� REJECT, // ��������������������������� }}
������������������������������������������������������������
���������������������������������
FreeConnectionQueue ���������������������������������������������������Queue���������������������������1������������������������������������������������1������������Queue���������������������������������������Queue������������������Queue������ConcurrentQueue���
BusyConnectionDic ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������FreeConnectionQueue ���������������������
MQConnectionPoolUsingDicNew ������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
MQConnectionPoolSemaphore ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
���������������������
1.MaxConnectionCount������������������������������������������������������������������������������������CONFIG������������������30���
2.DefaultMaxConnectionUsingCount������������������������������������������������������������������������������������������������������1000������������������������������������CONFIG���������������MaxConnectionCount������������������������������������������
3.CreateMQConnectionInPoolNew������������������������MQ������������������������������������������������������������������������������������������������������������������������������������������������
������3.1 ������MQConnectionPoolSemaphore.WaitOne() ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
������3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount���������������������������+������������������������������������������������������������������������������MQ���������������FreeConnectionQueue.TryDequeue(out mqConnection) ���������������������������������������������������������������������������������������������������������������������������������������������������������������
������3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen ���������������������������������������������������������������������������,���������������������������������������������������������������������������������������������������
������3.4BusyConnectionDic[mqConnection] = true;������������������������������MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; ���������������1������������������������������������������������������
4.ResetMQConnectionToFree���������������������������������������������MQ������������������������������������������������������������������������������������������������������������������������������������������������������������������
������������������������������������������������MQHelper���������������������������������������������������
���������������������������������������
������������������������������
public string GetMessage(string queueName) { string message = null; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) => { message = msg; return ConsumeAction.ACCEPT; }); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex); message = "ERROR:" + ex.Message; } //BaseUtil.Logger.InfoFormat("���{0}���������������������������(������������:{1})���������������������:{2}", Interlocked.Increment(ref requestCount), queueName, message); return message; }
������������������������
public string SendMessage(string queueName, string msg) { string result = null; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); result = MQHelper.SendMsg(connection, queueName, msg); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex); result = ex.Message; } return result; }
���������������������������������
public int GetMessageCount(string queueName) { int result = -1; try { var connection = MQHelper.CreateMQConnectionInPoolNew(); result = MQHelper.GetMessageCount(connection, queueName); } catch (Exception ex) { BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex); result = -1; } return result; }
���������������������BaseUtil.Logger ���Log4Net���������������������������������������������������������������������ConsumeMsg������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������MQHelper���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
2019-7-3������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using RabbitMQ.Util;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Web.Caching;using System.Web;using System.Configuration;using System.IO;using System.Collections.Concurrent;using System.Threading;using System.Runtime.CompilerServices;using System.Net.Sockets;namespace KYLDMQService.Core{ public class MQHelper { private const string CacheKey_MQConnectionSetting = "MQConnectionSetting"; private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount"; public const int DefaultMaxConnectionCount = 30;//��������������������������������� public const int DefaultMaxConnectionUsingCount = 10000;//��������������������������������� public const int DefaultReTryConnectionCount = 1;//������������������������ private static int MaxConnectionCount { get { if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null) { return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]); } else { int mqMaxConnectionCount = 0; string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount]; if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) { mqMaxConnectionCount = DefaultMaxConnectionCount; } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath)); return mqMaxConnectionCount; } } } ////// ������������ /// /// ��������������� /// ������������ /// ������������ ///private static ConnectionFactory CrateFactory() { var mqConnectionSetting = GetMQConnectionSetting(); var connectionfactory = new ConnectionFactory(); connectionfactory.HostName = mqConnectionSetting[0]; connectionfactory.UserName = mqConnectionSetting[1]; connectionfactory.Password = mqConnectionSetting[2]; if (mqConnectionSetting.Length > 3) //��������������� { connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]); } return connectionfactory; } private static string[] GetMQConnectionSetting() { string[] mqConnectionSetting = null; if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null) { //MQConnectionSetting=Host IP|;userid;|;password string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting]; if (!string.IsNullOrWhiteSpace(mqConnSettingStr)) { mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr); if (mqConnSettingStr.Contains(";|;")) { mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries); } } if (mqConnectionSetting == null || mqConnectionSetting.Length < 3) { throw new Exception("MQConnectionSetting���������������������������"); } string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath)); } else { mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[]; } return mqConnectionSetting; } public static IConnection CreateMQConnection() { var factory = CrateFactory(); factory.AutomaticRecoveryEnabled = true;//������������ var connection = factory.CreateConnection(); connection.AutoClose = false; return connection; } private readonly static ConcurrentQueue FreeConnectionQueue;//������������������������ private readonly static ConcurrentDictionary BusyConnectionDic;//������������������������������������ private readonly static ConcurrentDictionary MQConnectionPoolUsingDicNew;//������������������ private readonly static Semaphore MQConnectionPoolSemaphore; private readonly static object freeConnLock = new object(), addConnLock = new object(); private static int connCount = 0; static MQHelper() { FreeConnectionQueue = new ConcurrentQueue (); BusyConnectionDic = new ConcurrentDictionary (); MQConnectionPoolUsingDicNew = new ConcurrentDictionary ();//������������������ MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//��������������������������������������������� } public static IConnection CreateMQConnectionInPoolNew() { MQConnectionPoolSemaphore.WaitOne(10000);//��� MaxConnectionCount) //{ // System.Diagnostics.Debug.WriteLine("ConnectionCount:" + totalCount); // BaseUtil.Logger.DebugFormat("more than totalCount:{0}",totalCount); //} IConnection mqConnection = null; try { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//��������������������������������������������������������������������������� { lock (addConnLock) { if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount) { mqConnection = CreateMQConnection(); BusyConnectionDic[mqConnection] = true;//��������������������������� MQConnectionPoolUsingDicNew[mqConnection] = 1; // BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } } } if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //������������������������������������������������������������ { // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); return CreateMQConnectionInPoolNew(); } else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //���������������������������������������������������������������������������,������������������������������������ { if (mqConnection.IsOpen) { mqConnection.Close(); } mqConnection.Dispose(); // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); mqConnection = CreateMQConnection(); MQConnectionPoolUsingDicNew[mqConnection] = 0; // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count); } BusyConnectionDic[mqConnection] = true;//��������������������������� MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//���������������1 // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); return mqConnection; } catch //������������������������������������������������������������������Connection������������������������������������������������������������������ { if (mqConnection != null) { ResetMQConnectionToFree(mqConnection); } else { MQConnectionPoolSemaphore.Release(); } throw; } } private static void ResetMQConnectionToFree(IConnection connection) { try { lock (freeConnLock) { bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //��������������������� { // BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } else//������������������������������������������������ { if (!BusyConnectionDic.TryRemove(connection, out result)) { BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic(2 times):{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count); } } if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//������������������������������������������>MaxConnectionCount��������������������������� { connection.Close(); connection.Dispose(); } else if (connection.IsOpen)//���������OPEN������������������������������������������������ { FreeConnectionQueue.Enqueue(connection);//������������������������������������������������������ } } } catch { throw; } finally { MQConnectionPoolSemaphore.Release();//������������������������������ } //Interlocked.Decrement(ref connCount); //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount); } /// /// ������������ /// /// ������������������������ ///������������ /// ������������ /// ��������������� /// ������ ///public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true) { bool reTry = false; int reTryCount = 0; string sendErrMsg = null; do { reTry = false; try { using (var channel = connection.CreateModel())//������������������ { // ��������������������������������������������������������������������������������������������������������������������������������������� channel.QueueDeclare(queueName, durable, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1���������������,2.��������������� if (!durable) properties = null; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish("", queueName, properties, body); } sendErrMsg = string.Empty; } catch (Exception ex) { if (BaseUtil.IsIncludeException (ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//���������1��� { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } sendErrMsg = ex.ToString(); } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return sendErrMsg; } /// /// ������������ /// /// ������������������������ /// ������������ /// ��������������� /// ������������������ /// ��������������������������� public static void ConsumeMsg(IConnection connection, string queueName, bool durable, FuncdealMessage, Action saveLog = null) { try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queueName, durable, false, false, null); //������������ channel.BasicQos(0, 1, false); //������������������������ var consumer = new QueueingBasicConsumer(channel); //��������������� // ������������������������������������������������������������������������������������������������������ channel.BasicConsume(queueName, false, consumer); while (true) //������������������������ { ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //������������ string message = null; try { var body = ea.Body; message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } catch (Exception ex) { if (saveLog != null) { saveLog(message, ex); } } if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //������������������������ } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //������������������ } else { channel.BasicNack(ea.DeliveryTag, false, false); //������������������ } } } } catch (Exception ex) { if (saveLog != null) { saveLog("QueueName:" + queueName, ex); } throw ex; } finally { //MQConnectionPool[connection] = false;//������������ ResetMQConnectionToFree(connection); } } /// /// ������������������������ /// /// ������������������������ /// ������������ /// ��������� /// ������������������ public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, FuncdealMessage) { bool reTry = false; int reTryCount = 0; ConsumeAction consumeResult = ConsumeAction.RETRY; IModel channel = null; BasicDeliverEventArgs ea = null; do { reTry = false; try { channel = connection.CreateModel(); channel.QueueDeclare(QueueName, durable, false, false, null); //������������ channel.BasicQos(0, 1, false); //������������������������ uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0) { var consumer = new QueueingBasicConsumer(channel); //��������������� // ������������������������������������������������������������������������������������������������������ channel.BasicConsume(QueueName, false, consumer); ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //������������ var body = ea.Body; var message = Encoding.UTF8.GetString(body); consumeResult = dealMessage(message); } else { dealMessage(string.Empty); } } catch (Exception ex) { if (BaseUtil.IsIncludeException (ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//���������1��� { if (channel != null) channel.Dispose(); ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { if (channel != null && ea != null) { if (consumeResult == ConsumeAction.ACCEPT) { channel.BasicAck(ea.DeliveryTag, false); //������������������������ } else if (consumeResult == ConsumeAction.RETRY) { channel.BasicNack(ea.DeliveryTag, false, true); //������������������ } else { channel.BasicNack(ea.DeliveryTag, false, false); //������������������ } } if (channel != null) channel.Dispose(); ResetMQConnectionToFree(connection); } } } while (reTry); } /// /// ��������������������� /// /// /// ///public static int GetMessageCount(IConnection connection, string QueueName) { int msgCount = 0; bool reTry = false; int reTryCount = 0; do { reTry = false; try { using (var channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); //������������ msgCount = (int)channel.MessageCount(QueueName); } } catch (Exception ex) { if (BaseUtil.IsIncludeException (ex)) { if ((++reTryCount) <= DefaultReTryConnectionCount)//���������1��� { ResetMQConnectionToFree(connection); connection = CreateMQConnectionInPoolNew(); reTry = true; } } throw ex; } finally { if (!reTry) { ResetMQConnectionToFree(connection); } } } while (reTry); return msgCount; } } public enum ConsumeAction { ACCEPT, // ������������ RETRY, // ��������������������������������������������� REJECT, // ��������������������������� }}
������
发表评论
最新留言
关于作者
