当前位置: 首页 > news >正文

RabbitMQ帮助类的封装

RabbitMQ帮助类的封装

基本部分

public class RabbitMQInvoker
{#region Identy private static IConnection _CurrentConnection = null;private readonly string _HostName = null;private readonly string _UserName = null;private readonly string _Password = null;#endregionpublic RabbitMQInvoker(string hostName = "localhost", string userName = "guest", string password = "guest"){this._HostName = hostName;this._UserName = userName;this._Password = password;}......
}

初始化链接

	#region 初始化链接 private static object RabbitMQInvoker_InitLock = new object();private void InitConnection(){if (_CurrentConnection == null || !_CurrentConnection.IsOpen){lock (RabbitMQInvoker_InitLock){if (_CurrentConnection == null || !_CurrentConnection.IsOpen){var factory = new ConnectionFactory(){HostName = this._HostName,Password = this._Password,UserName = this._UserName};_CurrentConnection = factory.CreateConnection();}}}}#endregion

初始化交换机

	#region 初始化交换机 private static Dictionary<string, bool> RabbitMQInvoker_ExchangeQueue = new Dictionary<string, bool>();private static object RabbitMQInvoker_BindQueueLock = new object();/// <summary>/// 必须先声明exchange--检查+初始化/// </summary>/// <param name="rabbitMQConsumerModel"></param>private void InitExchange(string exchangeName){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认{lock (RabbitMQInvoker_BindQueueLock){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}")){this.InitConnection();using (IModel channel = _CurrentConnection.CreateModel()){channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);}RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true;}}}}/// <summary>/// 初始化绑定关系/// </summary>/// <param name="rabbitMQConsumerModel"></param>private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")){lock (RabbitMQInvoker_BindQueueLock){if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")){this.InitConnection();using (IModel channel = _CurrentConnection.CreateModel()){channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null);}RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true;}}}}#endregion

发送信息

	#region 发送消息/// <summary>/// 只管exchange---/// 4种路由类型?/// /// Send前完成交换机初始化/// </summary>/// <param name="exchangeName"></param>/// <param name="message">建议Json格式</param>public void Send(string exchangeName, string message){this.InitExchange(exchangeName);if (_CurrentConnection == null || !_CurrentConnection.IsOpen){this.InitConnection();}using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信{try{channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: exchangeName,routingKey: string.Empty,basicProperties: null,body: body);channel.TxCommit();//提交Console.WriteLine($" [x] Sent {body.Length}");}catch (Exception ex){Console.WriteLine(ex.Message);Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}");channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。}}}#endregion

接收信息

	#region Receive/// <summary>/// 注册处理动作/// </summary>/// <param name="rabbitMQConsumerMode"></param>/// <param name="func"></param>public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func<string, bool> func){this.InitBindQueue(rabbitMQConsumerMode);Task.Run(() =>{using (var channel = _CurrentConnection.CreateModel()){var consumer = new EventingBasicConsumer(channel);channel.BasicQos(0, 0, true);consumer.Received += (sender, ea) =>{string str = Encoding.UTF8.GetString(ea.Body.ToArray());if (func(str)){channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费}else{channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列}};channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName,autoAck: false,//不ACKconsumer: consumer);Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");Console.ReadLine();Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}");}});}#endregion
http://www.lryc.cn/news/97404.html

相关文章:

  • mac 移动硬盘未正常退出,再次链接无法读取(显示)
  • 短视频账号矩阵系统源码开发部署路径
  • 前端 | ( 十一)CSS3简介及基本语法(上) | 尚硅谷前端html+css零基础教程2023最新
  • Kafka入门到起飞系列 - 副本机制,什么是副本因子呢?
  • 2023年基准Kubernetes报告:6个K8s可靠性失误
  • 程序员面试系列,k8s常见面试题
  • docker版jxTMS使用指南:站点的调整
  • element ui input 深层循环v-model绑定默认数据删除不了的情况
  • GBDT的参数空间与超参数优化
  • 多线程练习——抽奖箱
  • RK3399平台开发系列讲解(内核调试篇)Valgrind 内存调试与性能分析
  • Windows 11的最新人工智能应用Windows Copilot面世!
  • Mac 预览(Preview)丢失PDF标注恢复
  • 4.5. 方法的四种类型
  • 四旋翼无人机使用教程
  • 优化 PHP 数据库查询性能
  • vue 使用stompjs websocket连接rabbitmq
  • com.android.ide.common.signing.KeytoolException:
  • leetcode 1870. Minimum Speed to Arrive on Time(准时到达的最小速度)
  • 本地非文字资源无法加载
  • Java电子招投标采购系统源码-适合于招标代理、政府采购、企业采购
  • 万向节死锁
  • 大数据课程D1——hadoop的初识
  • xml命名空间
  • 七、Kafka源码分析之网络通信
  • WEB安全测试通常要考虑的测试点
  • 关于uni.createInnerAudioContext()的duration音频长度获取不到问题
  • 使用rknn-toolkit2把YOLOV5部署到OK3588上
  • 【雕爷学编程】Arduino动手做(93)--- 0.96寸OLED液晶屏模块14
  • ffplay播放器剖析(5)----视频输出剖析