RabbitMQ 消息生产者操作类
发布日期:2021-05-14 17:45:23 浏览次数:18 分类:精选文章

本文共 2831 字,大约阅读时间需要 9 分钟。

public class RabbitMQHelper
{
private static IConnection rabbitMqConnection = null;
private static RabbitMQHelper _Instance = null;
private static readonly object objLock = new object();
private RabbitMQHelper(string hostName, int port, string virtualHost, string userName, string pwd)
{
ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = hostName,
Port = port,
VirtualHost = virtualHost,
UserName = userName,
Password = pwd,
Protocol = Protocols.DefaultProtocol,
AutomaticRecoveryEnabled = true
};
rabbitMqConnection = rabbitMqFactory.CreateConnection();
}
~RabbitMQHelper()
{
if (rabbitMqConnection != null)
{
rabbitMqConnection.Close();
}
}
public static RabbitMQHelper Instance(string hostName, int port, string virtualHost, string userName, string pwd)
{
if (_Instance == null)
{
lock (objLock)
{
if (_Instance == null)
{
_Instance = new RabbitMQHelper(hostName, port, virtualHost, userName, pwd);
}
}
}
return _Instance;
}
public IConnection Connection()
{
return rabbitMqConnection;
}
public void MessageProducer(string exchangeName, string routingKey, string queueName, List
listMessage, string type = "direct")
{
if (listMessage != null && listMessage.Count > 0)
{
using (IModel channel = rabbitMqConnection.CreateModel())
{
Dictionary
args = new Dictionary
();
args.Add("x-dead-letter-exchange", "exchange.dlx");
args.Add("x-dead-letter-routing-key", "rk.sp.mipo");
channel.QueueDeclare(queueName, true, false, false, args);
channel.ExchangeDeclare(exchangeName, type, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName, durable: true, autoDelete: false, exclusive: false, arguments: args);
channel.QueueBind(queueName, exchangeName, routingKey);
var props = channel.CreateBasicProperties();
props.Persistent = true;
props.DeliveryMode = 2;
channel.ConfirmSelect();
foreach (var msg in listMessage)
{
channel.BasicPublish(exchangeName, routingKey, basicProperties: props, body: Encoding.UTF8.GetBytes(msg));
}
channel.WaitForConfirmsOrDie();
}
}
}
}

以上代码实现了对RabbitMQ的高效管理,支持单例模式和消息批量发布功能。代码中使用了死 letter exchange来处理不可恢复的消息,同时支持消息的持久化和确认机制。

上一篇:RabbitMQ 消费者操作类
下一篇:Redis与Memcached

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年04月07日 09时43分01秒