RabbitMQ帮助类的封装
基本部分
public class RabbitMQInvoker
{#region Identy private static IConnection _CurrentConnection = null;private readonly string _HostName = null;private readonly string _UserName = null;private readonly string _Password = null;#endregionpublic RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest"){this._HostName = hostName;this._UserName = userName;this._Password = password;}......
}
初始化链接
#region 初始化链接 private static object RabbitMQInvoker_InitLock = new object();private void InitConnection(){if (_CurrentConnection == null || !_CurrentConnection.IsOpen){lock (RabbitMQInvoker_InitLock){if (_CurrentConnection == null || !_CurrentConnection.IsOpen){var factory = new ConnectionFactory(){HostName = this._HostName,Password = this._Password,UserName = this._UserName};_CurrentConnection = factory.CreateConnection();}}}}#endregion
初始化交换机
#region 初始化交换机 private static Dictionary<string, bool> RabbitMQInvoker_ExchangeQueue = new Dictionary<string, bool>();private static object RabbitMQInvoker_BindQueueLock = new object();private void InitExchange(string exchangeName){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}")){lock (RabbitMQInvoker_BindQueueLock){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}")){this.InitConnection();using (IModel channel = _CurrentConnection.CreateModel()){channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);}RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true;}}}}private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")){lock (RabbitMQInvoker_BindQueueLock){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")){this.InitConnection();using (IModel channel = _CurrentConnection.CreateModel()){channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null);}RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true;}}}}#endregion
发送信息
#region 发送消息public void Send(string exchangeName, string message){this.InitExchange(exchangeName);if (_CurrentConnection == null || !_CurrentConnection.IsOpen){this.InitConnection();}using (var channel = _CurrentConnection.CreateModel()){try{channel.TxSelect();var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: exchangeName,routingKey: string.Empty,basicProperties: null,body: body);channel.TxCommit();Console.WriteLine($" [x] Sent {body.Length}");}catch (Exception ex){Console.WriteLine(ex.Message);Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");channel.TxRollback(); }}}#endregion
接收信息
#region Receivepublic void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func<string, bool> func){this.InitBindQueue(rabbitMQConsumerMode);Task.Run(() =>{using (var channel = _CurrentConnection.CreateModel()){var consumer = new EventingBasicConsumer(channel);channel.BasicQos(0, 0, true);consumer.Received += (sender, ea) =>{string str = Encoding.UTF8.GetString(ea.Body.ToArray());if (func(str)){channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);}else{channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);}};channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName,autoAck: false,consumer: consumer);Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");Console.ReadLine();Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");}});}#endregion