当前位置: 首页 > news >正文

WebSocket --- ws模块源码解析(详解)

摘要

在这一篇文章中,写了如何在node端和web端,实现一个WebSocket通信。
WebSocket在node端和客户端的使用

而在node端里面,我们使用了ws模块来创建WebSocket和WebSocketServer,那ws模块是如何做到可以和客户端进行双向通信的呢?

426状态码

在HTTP中,426表示“Upgrade Required”,即客户端需要通过HTTP协议的升级版进行访问。这个状态码主要用在WebSockets协议中,表示客户端需要使用WebSockets协议来连接服务器。

什么意思呢?例如我们创建一个HTTP服务如果这么写:

const http = require('http')const server = http.createServer((req, res) => {const body = http.STATUS_CODES[426];res.writeHead('426', {'Content-Type': 'text/align','Content-Length': body.length})res.end(body)
})server.listen(8080)

就是告诉客户端,如果你访问我这边的服务,那么你就要进行升级服务。也就是使用WebSocket对我进行访问!

那有一个问题,如果客户端使用了WebSocket访问,服务端要怎么进行响应呢?

还直接在createServer里面的回调中处理吗?

upgrade事件

在这里面,如果客户端通过WebSocket进行访问服务端,会触发服务端server的upgrade事件,也就是说会进下面的回调函数里。

server.on('upgrade',(req, socket, head) => {// 固定格式const key = req.headers['sec-websocket-key'];const digest = createHash('sha1').update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',).digest('base64');const headers = ['HTTP/1.1 101 Switching Protocols','Upgrade: websocket','Connection: Upgrade',`Sec-WebSocket-Accept: ${digest}`];socket.write(headers.concat('\r\n').join('\r\n'));// 客户端发送的消息socket.on('data', (data) => {console.log(data.toString());})// 服务端向客户端发送消息socket.write('你好')
})

这个回调中,通过socket来进行和客户端进行双向通信。

转码

但是只有上面的例子,似乎每次拿到的数据都是乱码。这是因为WebSocket之间的通信的报文,不能通过Buffer的toString直接转码。这里提供一下在网上找到的转码方法:

server.on('upgrade', (req, socket, head) => {const key = req.headers['sec-websocket-key'];const digest = createHash('sha1').update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',).digest('base64');const headers = ['HTTP/1.1 101 Switching Protocols','Upgrade: websocket','Connection: Upgrade',`Sec-WebSocket-Accept: ${digest}`];socket.write(headers.concat('\r\n').join('\r\n'));socket.on('data',(data) => {console.log(decodeSocketFrame(data).payloadBuf.toString())socket.write(encodeSocketFrame({fin:1,opcode:1,payloadBuf:Buffer.from('你好')}))})
})function decodeSocketFrame (bufData){let bufIndex = 0const byte1 = bufData.readUInt8(bufIndex++).toString(2)const byte2 = bufData.readUInt8(bufIndex++).toString(2)console.log(byte1);console.log(byte2);const frame =  {fin:parseInt(byte1.substring(0,1),2),// RSV是保留字段,暂时不计算opcode:parseInt(byte1.substring(4,8),2),mask:parseInt(byte2.substring(0,1),2),payloadLen:parseInt(byte2.substring(1,8),2),}// 如果frame.payloadLen为126或127说明这个长度不够了,要使用扩展长度了// 如果frame.payloadLen为126,则使用Extended payload length同时为16/8字节数// 如果frame.payloadLen为127,则使用Extended payload length同时为64/8字节数// 注意payloadLen得长度单位是字节(bytes)而不是比特(bit)if(frame.payloadLen==126) {frame.payloadLen = bufData.readUIntBE(bufIndex,2);bufIndex+=2;} else if(frame.payloadLen==127) {// 虽然是8字节,但是前四字节目前留空,因为int型是4字节不留空int会溢出bufIndex+=4;frame.payloadLen = bufData.readUIntBE(bufIndex,4);bufIndex+=4;}if(frame.mask){const payloadBufList = []// maskingKey为4字节数据frame.maskingKey=[bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++],bufData[bufIndex++]];for(let i=0;i<frame.payloadLen;i++) {payloadBufList.push(bufData[bufIndex+i]^frame.maskingKey[i%4]);}frame.payloadBuf = Buffer.from(payloadBufList)} else {frame.payloadBuf = bufData.slice(bufIndex,bufIndex+frame.payloadLen)}return frame
}function encodeSocketFrame (frame){const frameBufList = [];// 对fin位移七位则为10000000加opcode为10000001const header = (frame.fin<<7)+frame.opcode;frameBufList.push(header)const bufBits = Buffer.byteLength(frame.payloadBuf);let payloadLen = bufBits;let extBuf;if(bufBits>=126) {//65536是2**16即两字节数字极限if(bufBits>=65536) {extBuf = Buffer.allocUnsafe(8);buf.writeUInt32BE(bufBits, 4);payloadLen = 127;} else {extBuf = Buffer.allocUnsafe(2);buf.writeUInt16BE(bufBits, 0);payloadLen = 126;}}let payloadLenBinStr = payloadLen.toString(2);while(payloadLenBinStr.length<8){payloadLenBinStr='0'+payloadLenBinStr;}frameBufList.push(parseInt(payloadLenBinStr,2));if(bufBits>=126) {frameBufList.push(extBuf);}frameBufList.push(...frame.payloadBuf)return Buffer.from(frameBufList)
}

WebSocketServer的实现

有了上面的基础,基本知道双向通信是怎么做到的了。就来看一下WebSocketServer的实现。
当我们使用的时候,我们是以这种方式:

const WebSocketServer = require('ws')const wss = new WebSocketServer('8080');
wss.on('connection', (ws) => {})

我们知道,connection是httpServer的回调,为什么在WebSocketServer中可以使用呢?

export default class WebSocketServer {constructor(port) {this._server = http.createServer((req, res) => {const body = http.STATUS_CODES[426];res.writeHead('426', {'Content-Type': 'text/align','Content-Length': body.length})res.end(body)})this._server.listen(port);const connectionEmit = this.emit.bind(this, 'connection');const closeEmit = this.emit.bind(this, 'close');// 其他事件,都是http能监听到的;const map = {connection: connectionEmit,close: closeEmit}for(let emitName in map) {this._server.on(emitName, map[emitName])}}
}

在WebSocketServer中,如果客户端触发了http的事件时,它便将其转发到WebSocket实例上面。
然后再处理自己的逻辑。

http://www.lryc.cn/news/238822.html

相关文章:

  • 一文带你拿下MySQL之增删查改(基础)
  • 2023亿发数字化智能工单,专业管理工单处理全流程,助力企业转型腾飞
  • JavaScript 常用符号
  • GPT-4:论文阅读笔记
  • hm商城微服务远程调用及拆分
  • 设置指定时间之前的时间不可选
  • Java使用Redis来实现分布式锁
  • 移动端表格分页uni-app
  • 全志R128芯片RTOS调试指南
  • 超级实用的程序员接单平台,看完少走几年弯路,强推第一个!
  • 前端字符串方法汇总
  • 12 分布式锁加入看门狗
  • 怎么判断list是否为null
  • 11.数据公式中使用2个 $$ a =b $$,是什么意思?
  • 设计模式-14-迭代器模式
  • 防雷接地+防雷工程施工综合方案
  • 排序算法--选择排序
  • 【Web】Ctfshow SSRF刷题记录1
  • 【算法挨揍日记】day30——300. 最长递增子序列、376. 摆动序列
  • ROS2对比ROS1的一些变化与优势(全新安装ROS2以及编译错误处理)《1》
  • 基于单片机PM2.5监测系统仿真设计
  • CRM系统中的联系人是什么?如何进行联系人管理?
  • uniapp:如何实现点击图片可以全屏展示预览
  • python运行hhsearch二进制命令的包装器类
  • Java 网络编程、e-mail、多线程编程
  • 为虚幻引擎开发者准备的Unity指南
  • Vue 2使用element ui 表格不显示
  • C++学习 --文件
  • java/Android:将字符串按数量分割
  • JVM 监控命令详解