当前位置: 首页 > news >正文

thinkphp6+swoole使用rabbitMq队列

  1. 安装think-swoole
  2. 安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用
  3. 安装rabbitMq延迟队列插件
    1. 安装 rabbitmq_delayed_message_exchange 插件,按照以下步骤操作:  
      下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases以下路由不一定是一样的!!!
      将插件复制到 RabbitMQ 插件目录: 将下载的插件文件复制到 RabbitMQ 插件目录。  
      sudo cp rabbitmq_delayed_message_exchange-3.8.9.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/
      将 <version> 替换为您的 RabbitMQ 服务器版本。  
      启用插件: 使用 RabbitMQ 命令行工具启用插件。  
      sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      重启 RabbitMQ: 重启 RabbitMQ 服务器以应用更改。  
      sudo systemctl restart rabbitmq-server

  4. config目录创建 rabbitmq.php 文件,内容如下
return ['host' => '服务器地址','port' => '端口','user' => '账户','password' => '密码','vhost' => '/','exchange' => 'delayed_exchange','exchange_type' => 'direct', // 交换机类型(如 direct、fanout、topic)'exchange_arguments' => ['x-delayed-type' => 'direct'], // 延迟交换机参数
];

创建 RabbitMQService 类


class RabbitMQService
{protected $connection;protected $channel;public function __construct(){$config = config('rabbitmq');$this->connection = new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);$this->channel = $this->connection->channel();$this->channel->exchange_declare($config['exchange'],'x-delayed-message', // 指定延迟交换机类型false,true,false,false,false,new AMQPTable(['x-delayed-type' => $config['exchange_type']]) // 设置延迟交换机的底层类型);}public function publish($message, $queue, $delay = 0){// 声明队列$this->channel->queue_declare($queue, false, true, false, false);// 绑定队列到交换机$this->channel->queue_bind($queue, config('rabbitmq.exchange'));// 设置延迟头部信息$headers = new AMQPTable(['x-delay' => $delay // 延迟时间,单位为毫秒]);// 创建消息$msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 持久化消息]);$msg->set('application_headers', $headers); // 正确设置头信息// 发布消息到交换机$this->channel->basic_publish($msg, config('rabbitmq.exchange'));}public function consume($queue, $callback){$this->channel->queue_declare($queue, false, true, false, false);$this->channel->basic_consume($queue, '', false, true, false, false, $callback);while ($this->channel->is_consuming()) {$this->channel->wait();}}public function __destruct(){$this->channel->close();$this->connection->close();}
}

创建 RabbitMqUseService 类文件


class RabbitMqUseService
{// 消费队列public static function consumption(){$rabbitMQ = new RabbitMQService();$rabbitMQ->consume('queue', function ($msg){Log::error('消费队列'.$msg->body);$con = json_decode($msg->body,true);$class = $con['class'];Log::error("class->>".$class);if(class_exists($class)){$obj = new $class;$obj->handle($con['body']);}});}/*** @param $obj* @param $data* @param $delay* @return void*/public static function push($obj,$data,$delay = 0){$rabbitMQ = new RabbitMQService();$class = get_class($obj);// 构造消息体$message = json_encode(['class' => $class,  // 类名'body' => $data     // 具体数据]);$rabbitMQ->publish($message, 'queue', $delay * 1000);var_dump('已加入');}public function test(){self::push(new TestJob(),['name'=>'test'],10);}
}

配置消费任务

新建文件类 RabbitConsumptionHandleclass RabbitConsumptionHandle
{public function handle(){RabbitMqUseService::consumption();}
}在app/event.php listen 中引入'listen'    => ['swoole.init' => [RabbitConsumptionHandle::class]]

新增队列

      RabbitMqUseService::push(new \app\job\TestJob(),['a'=>1,'b'=>2]);

http://www.lryc.cn/news/526882.html

相关文章:

  • 大模型开发 | RAG在实际开发中可能遇到的坑
  • mybatis是什么?有什么作用?mybatis的简单使用
  • 求平均年龄(信息学奥赛一本通-1059)
  • CY T 4 BB 5 CEB Q 1 A EE GS MCAL配置 - MCU组件
  • 10 Hyperledger Fabric 介绍
  • Word 中实现方框内点击自动打 √ ☑
  • 噪声算法 纹理
  • hexo + Butterfly搭建博客
  • 05.KNN算法总结
  • CentOS 7 搭建lsyncd实现文件实时同步 —— 筑梦之路
  • java定时任务备份数据库
  • Vue.js 传递路由参数和查询参数
  • 2025数学建模美赛|F题成品论文
  • 私有包上传maven私有仓库nexus-2.9.2
  • 企业信息化4:免费开源的财务管理系统
  • PyCharm配置Python环境
  • 蓝桥杯3522 互质数的个数 | 数论
  • Effective C++ 规则49:了解 new-handler 的行为
  • 头像生成小程序搭建(免费分享)
  • 手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion(原理介绍)
  • MySQL 基础学习(2): INSERT 操作
  • openstack 客户端命令行简介
  • Oracle查看数据库表空间使用情况
  • [护网杯 2018]easy_tornado1
  • 关于java实现word(docx、doc)转html的解决方案
  • 【8】思科IOS AP升级操作
  • 【ROS2】RViz2界面类 VisualizationFrame 详解
  • 2025年01月24日Github流行趋势
  • Gradle buildSrc模块详解:集中管理构建逻辑的利器
  • 【Airsim 仿真】查找配置文件 settings json 的路径优先级