当前位置: 首页 > news >正文

Django实时通信实战:WebSocket与ASGI全解析(下)

文章目录

    • 一、实战:构建实时聊天室
      • 环境准备
      • 实现 WebSocket 消费者
      • 配置 WebSocket 路由
      • 创建聊天界面模板
      • 实现效果
    • 二、生产环境部署
      • Nginx 配置
      • 使用 Gunicorn+Uvicorn 部署
      • 优化 Worker 数量


一、实战:构建实时聊天室

环境准备

下面将使用 Django Channels 构建一个多用户实时聊天室。Django Channels的介绍、安装与配置,参考上篇。

实现 WebSocket 消费者

创建mysite\myapp_infra\websocket\consumers.py文件,这是处理 WebSocket 连接的核心。主要实现了如下方法:

  • connect:处理新连接,验证用户token,将用户通道写入缓存,并加入默认组
  • disconnect:处理连接断开,删除用户的缓存通道记录,将用户从所在的房间组中移除
  • receive:处理接收到的消息。首先进行心跳检测(ping/pong),然后验证用户身份,解析两层 JSON 结构的消息内容。根据消息类型 demo-message-send 处理文本消息:若指定接收用户则单发,否则群发广播。
  • broadcast_message:处理群发消息,从事件中提取payload数据并发送给所有连接的客户端
  • single_message:处理单发消息,从事件中提取payload数据并发送给指定的客户端
"""WebScoket 消费者"""import json
import logging
from django.core.cache import cache
from django.conf import settings
from channels.generic.websocket import AsyncWebsocketConsumer
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenErrorlogger = logging.getLogger(__name__)class InfraConsumer(AsyncWebsocketConsumer):"""基础设施-WebSocket 异步消费者"""async def connect(self):"""当客户端发起 WebSocket 连接时调用"""query_params = self.scope["query_string"].decode()# 从查询参数获取tokentoken_param = [param.split("=")for param in query_params.split("&")if param.startswith("token=")]if not token_param or len(token_param[0]) != 2:logger.error("缺少token参数")await self.close(code=4001)  # 自定义错误码returntoken = token_param[0][1]try:# 验证Refresh Token有效性refresh = RefreshToken(token)user_id = refresh["user_id"]self.scope["user"] = {"id": user_id, "token_type": "refresh"}# 登录成功后,将用户通道写入缓存cache.set(f"user_{user_id}_channel", self.channel_name)# 加入默认组self.room_group_name = settings.DEFAULT_GROUP_NAMEawait self.channel_layer.group_add(self.room_group_name, self.channel_name)# 接受客户端的WebSocket连接请求await self.accept()except TokenError as e:logger.error("无效token")await self.close(code=4003)except Exception as e:logger.error(str(e))await self.close(code=4000)async def disconnect(self, close_code):"""当客户端断开 WebSocket 连接时调用"""# 获取当前用户user = self.scope.get("user")if user:user_id = user.get("id")# 移除用户通道cache.delete(f"user_{user_id}_channel")# 将用户从组中移除room_group_name = getattr(self, "room_group_name", None)if room_group_name:await self.channel_layer.group_discard(room_group_name, self.channel_name)async def receive(self, text_data=None, bytes_data=None):"""当接收到客户端发送的消息时调用"""# 心跳检测if text_data == "ping":await self.send(text_data="pong")returnuser = self.scope.get("user")if not user:logger.warning(f"匿名用户访问拒绝:{text_data}")returnlogger.info(f"收到用户 {user.get('id')} 发送消息:{text_data}")# 因为消息采用了两次JSON封装,这里进行两次JSON解析try:outer_payload = json.loads(text_data)  # 外层JSON解释message_type = outer_payload.get("type")raw_content = outer_payload.get("content", "{}")inner_content = json.loads(raw_content)  # 内层JSON解释except json.JSONDecodeError:logger.error("内容解析失败")return# 处理业务消息if message_type == "demo-message-send":message_text = inner_content.get("text", "").strip()if not message_text:logger.warning("消息内容不能为空")return# 构建响应字典content = {"fromUserId": user.get("id"),"text": message_text,"single": False,  # 默认群发}# 判断接收对象target_user_id = inner_content.get("toUserId")if target_user_id not in [None, "", 0]:# 单发await self._send_single_message(target_user_id, content)else:# 群发广播await self._send_broadcast_message(content)def _build_response(self, content):"""构建标准化的响应消息,进行两次JSON封装"""# 第一次封装:添加消息类型inner_message = {"type": "demo-message-receive","content": json.dumps(content),}# 第二次封装:整体序列化return json.dumps(inner_message)async def _send_single_message(self, target_user_id, content):"""向指定用户发送单条消息"""# 获取用户通道target_channel = cache.get(f"user_{target_user_id}_channel")if not target_channel:return False# 构建并发送消息message = self._build_response(content)await self.channel_layer.send(target_channel,{"type": "single.message","payload": message,},)return Trueasync def _send_broadcast_message(self, content):"""向房间内所有用户广播消息"""message = self._build_response(content)await self.channel_layer.group_send(self.room_group_name,{"type": "broadcast.message","payload": message,},)async def broadcast_message(self, event):"""处理群发消息"""payload = event["payload"]await self.send(text_data=payload)async def single_message(self, event):"""处理单发消息"""payload = event["payload"]await self.send(text_data=payload)

点击查看完整代码

配置 WebSocket 路由

创建mysite\myapp_infra\websocket\routing.py文件,定义 WebSocket 的 URL 路由

"""WebSocket 路由配置"""from django.urls import re_path, path
from .consumers import InfraConsumerwebsocket_urlpatterns = [# 调用as_asgi()类方法来获取一个 ASGI 应用程序re_path(r"^infra/ws/?$", InfraConsumer.as_asgi()),
]

然后在项目的mysite\mysite\asgi.py配置中添加 ASGI 路由

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack# 设置环境变量并初始化Django应用
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django_application = get_asgi_application()def get_websocket_application():"""延迟导入WebSocket路由(避免循环导入)"""from myapp_infra.websocket.routing import websocket_urlpatternsreturn AuthMiddlewareStack(URLRouter(websocket_urlpatterns))# 协议路由:区分HTTP和WebSocket请求
application = ProtocolTypeRouter({"http": django_application,"websocket": get_websocket_application(),}
)

点击查看完整代码

创建聊天界面模板

以Vue3界面为例,代码实现了一个 WebSocket 客户端界面,功能包括:

  • 连接控制:显示连接状态、开启/关闭 WebSocket 连接。
  • 消息发送:输入消息内容并选择接收人(单发或群发),点击发送按钮通过 WebSocket 发送消息。
  • 消息接收与展示:监听 WebSocket 返回的数据,解析不同类型的消息(如单发、群发、系统通知)并在右侧列表中倒序展示。
  • 用户列表获取:页面加载时获取用户列表用于选择消息接收人。

在这里插入图片描述

点击查看完整代码

实现效果

使用 ASGI 运行项目

# 开发环境启动
uvicorn mysite.asgi:application --reload

使用不同的浏览器,分别登录不同的用户,实现实时互发消息。

在这里插入图片描述

二、生产环境部署

Nginx 配置

使用 Nginx 作为反向代理,添加以下配置来处理 WebSocket 连接。这段配置告诉 Nginx 如何正确处理 WebSocket 升级请求。

location /ws/ {proxy_pass http://backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";
}

使用 Gunicorn+Uvicorn 部署

Gunicorn(全称 Green Unicorn)是一个用于 UNIX 的 高性能 Python WSGI HTTP 服务器。Gunicorn只能用于 Linux 系统,Windows上无法使用。

安装

pip install gunicorn# 查看版本
gunicorn -v
gunicorn -h

Gunicorn 作为进程管理器,配合 Uvicorn 工作进程处理 ASGI 应用

  • -w 工作进程的数量。默认启动 1 个 Worker 进程
  • -k 要运行的工作进程类型。
  • -b 指定要绑定的服务器套接字。
# 基本启动命令
gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application

优化 Worker 数量

Gunicorn Worker 数量的设置对性能影响很大,推荐的公式是:(CPU核心数 × 2) + 1。我们可以编写一个启动脚本来自动计算最佳 Worker 数量

#!/bin/bash
# run_gunicorn.sh# 计算最佳 Worker 数
CORES=$(nproc)
WORKERS=$((CORES * 2 + 1))# 限制最大Worker数(避免内存不足)
MAX_WORKERS=8
if [ $WORKERS -gt $MAX_WORKERS ]; thenWORKERS=$MAX_WORKERS
fi# 启动命令
gunicorn -b 0.0.0.0:8000 \--workers $WORKERS \--max-requests 1000 \         # 预防内存泄漏--timeout 120 \               # 超时控制--keep-alive 5 \              # Keep-Alive-k uvicorn.workers.UvicornWorker \mysite.asgi:application

添加执行权限并运行

chmod +x run_gunicorn.sh
./run_gunicorn.sh

您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

http://www.lryc.cn/news/601346.html

相关文章:

  • 二、搭建springCloudAlibaba2021.1版本分布式微服务-Nacos搭建及服务注册和配置中心
  • mybatis的insert(pojo),会返回pojo吗
  • 激光SLAM技术综述(2025版)
  • springboot基于Java的人力资源管理系统设计与实现
  • Windows 11 安装 jdk 8
  • QT开发---网络编程下
  • 全面理解JVM虚拟机
  • Python day26
  • Python数据分析基础(一)
  • 沪深L2逐笔十档委托队列分时Tick历史数据分析处理
  • RK3568 Linux驱动学习——U-Boot使用
  • 15.7 DeepSpeed实战:单卡38GB到多卡12GB,3倍效率提升的ZeRO-3配置全解
  • golang设置http代理
  • 2025年Solar应急响应公益月赛-7月wp
  • 将 JsonArray 类型的数据导出到Excel文件里的两种方式
  • 新手向:IDM下载失败排查
  • keepalived入门及其基础运用实验
  • Java面试宝典:MySQL执行原理二
  • 字节跳动Coze Studio开源了!架构解析
  • 数据处理实战(含代码)
  • Web Worker:解锁浏览器多线程,提升前端性能与体验
  • 数据结构基础内容(第十篇:排序)
  • 力扣129. 求根节点到叶节点数字之和
  • 力扣热题100----------53最大子数组和
  • 【多模态】天池AFAC赛道四-智能体赋能的金融多模态报告自动化生成part2-报告输出
  • logstash采集springboot微服务日志
  • Spring经典“送命题”:BeanFactory vs FactoryBean
  • 力扣131:分割回文串
  • JavaScript单线程实现异步
  • 探秘CommonJS:Node.js模块化核心解析