当前位置: 首页 > news >正文

php:使用Ratchet类实现分布式websocket服务

一、前言

        最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。

二、问题

        但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。

三、安装Ratchet类

直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。

composer require cboden/ratchet

四、创建websocket服务

因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set('default_socket_timeout', -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port = 8083;// 创建事件循环(使用该机制实现异步非阻塞通信)$loop = LoopFactory::create();// 创建 React Socket 服务器$socket = new ReactSocket($loop);$socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址// 启动 WebSocket 服务器$server = new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop->run();} catch (\Exception $e) {echo $e->getMessage();}}
}

其中ModelWebsocket_Handler是封装好的websocket操作类

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';//客户端public $clients;public function __construct($loop) {$this->clients = new \SplObjectStorage();$this->subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this->clients->attach($conn);echo "New connection! ({$conn->resourceId})\n";//获取连接请求的参数$params = [];$queryString = $conn->WebSocket->request->getQuery();parse_str($queryString, $params);//存储资源id相关数据$this->setResourceDataMap($conn->resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo "Received message: {$msg}\n";foreach ($this->clients as $client) {if ($client === $from) {continue;}//发送消息$client->send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this->delResourceDataMap($conn->resourceId);$this->clients->detach($conn);echo "Connection {$conn->resourceId} has disconnected\n";}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo "An error occurred: {$e->getMessage()}\n";$this->delResourceDataMap($conn->resourceId);$conn->close();}/*** 存储资源id相关数据* * @param  string  $resourceId* @param  array   $data* @return bool*/public function setResourceDataMap($resourceId, $data) {$redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * @param  string  $resourceId* @return array*/public function getResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * @param  string  $resourceId* @return bool*/public function delResourceDataMap($resourceId) {$redis = Comm_Redis::init(true);$rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop->addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用foreach ($this->clients as $client) {$client->send("测试");} });}
}

其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。

当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。

http://www.lryc.cn/news/490661.html

相关文章:

  • 储能场站安全风险挑战
  • Ubuntu系统为同一逻辑网口配置不同网段的IP
  • MySQL出现Waiting for table metadata lock的原因以及解决方法(已亲测)
  • 学会Lambda,让程序Pythonic一点
  • GDPU 信息安全 期末复习
  • Python 使用 Token 认证方案连接 Kubernetes (k8s) 的详细过程
  • 【C++】ReadFile概述,及实践使用时ReadFile的速率影响研究
  • Mysql的UPDATE(更新数据)详解
  • 基于Java Springboot高校奖助学金系统
  • 如何在 Ubuntu 22.04 上安装带有 Nginx 的 ELK Stack
  • Python爬虫:深入探索1688关键词接口获取之道
  • Let‘s Encrypt SSL证书:acmessl.cn申请免费3个月证书
  • JSON Web Token (JWT)的简单介绍、验证过程及令牌刷新思路
  • xxl-job入门
  • 100.【C语言】数据结构之二叉树的堆实现(顺序结构) 1
  • 大模型 VS 大语言模型
  • Linux高阶——1117—TCP客户端服务端
  • 【Qt】Qt 在main.cpp中使用tr()函数报错
  • 面向对象高级(5)接口
  • uniapp发布android上架应用商店权限
  • Centos Stream 9安装Jenkins-2.485 构建自动化项目步骤
  • 电路模型和电路定理(二)
  • 瑞佑液晶控制芯片RA6807系列介绍 (三)软件代码详解 Part.10(让PNG图片动起来)完结篇
  • Qt常用控件 按钮
  • MySQL学习/复习10视图/用户/权限/语言连接数据库
  • vulfocus在线靶场:tomcat-pass-getshell 弱口令 速通手册
  • c#:winform调用bartender实现打印(学习整理笔记)
  • 牛客题库 21738 牛牛与数组
  • 探索PDFMiner:Python中的PDF解析利器
  • 掌握Go语言中的异常控制:panic、recover和defer的深度解析