当前位置: 首页 > news >正文

flask框架-[实现websocket]:将socketio处理函数部分集中管理,使用类的方式来管理,集中管理socketio处理函数

一、项目依赖

APScheduler==3.10.4
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyJWT==2.3.0
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0 #额外修改
Werkzeug==2.0.2 #额外修改

注意:在flask2.x版本依赖,不再支持flask_script了

flask2.x版本会自动注册 flask run 和flask db 两个命令行命令

1、启动项目

flask run --host 0.0.0.0 --port 9000

2、数据库迁移命令

flask db init

flask db migrate

flask db upgrade

二、项目结构

apps

        __init__.py  : 创建app应用,各种注册

        websocket

                consumers.py   #将所有socketio处理类放到这里

base

        settings.py

ext

        __init__.py:  拓展对象都放在这里

        config.py : 拓展对象的配置内容

app.py

三、具体代码使用

settings.py

import os
import datetime
def get_database(dic):'"mysql+pymysql://root:Huawei@123@localhost:3306/study_flask?charset=utf8"'engine = dic.get('ENGINE')driver = dic.get('DRIVER')user = dic.get('USER')host = dic.get('HOST')password = dic.get("PASSWORD")port = dic.get('PORT')name = dic.get('NAME')dbinfo = f"{engine}+{driver}://{user}:{password}@{host}:{port}/{name}?charset=utf8"return dbinfo#全局通用配置类
class Config:"""项目配置核心类"""DEBUG=TrueLOG_LEVEL = "INFO"SECRET_KEY= '8hdj^sasdas6736475#$#5&GHG'BASE_PATH = os.path.dirname(os.path.abspath(__file__))STATIC_PATH = os.path.join(BASE_PATH, 'static') #static/ 路由对应的目录TEMPLATES_PATH = os.path.join(BASE_PATH, 'templates')  # 用于专门检索静态文件的位置,方便修改# 中文乱码JSON_AS_ASCII = False# # 配置redis# # 项目上线以后,这个地址就会被替换成真实IP地址,mysql也是# REDIS_HOST = 'your host'# REDIS_PORT = your port# REDIS_PASSWORD = 'your password'# REDIS_POLL = 10#数据库配置dbinfo={'ENGINE':'mysql','DRIVER':'pymysql','USER':'root','PASSWORD':'123456','HOST':"127.0.0.1","PORT":"3306",'NAME':'flask_obj'}# 数据库连接格式SQLALCHEMY_DATABASE_URI = get_database(dbinfo)# 动态追踪修改设置,如未设置只会提示警告SQLALCHEMY_TRACK_MODIFICATIONS = False# 查询时会显示原始SQL语句SQLALCHEMY_ECHO = False# 数据库连接池的大小SQLALCHEMY_POOL_SIZE=100#指定数据库连接池的超时时间SQLALCHEMY_POOL_TIMEOUT=30# 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。SQLALCHEMY_MAX_OVERFLOW=2#配置日志时,需要设置False,只能flask才能捕获移除写到日志文件中PROPAGATE_EXCEPTIONS = False#配置时区TIMEZONE = local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfoclass DevConfig(Config):DEBUG = Trueclass TestConfig(Config):DEBUG = Trueclass Online(Config):DEBUG = Falseenvs = {'dev':DevConfig,'test':TestConfig,'online':Online,'default':Config
}

3.1、ext配置

1、ext/__init__.py

from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from flask_restful import Api
from flask_caching import Cache
from flask_socketio import SocketIOdb = SQLAlchemy() #数据库对象
cors = CORS() #跨域
cache = Cache() #缓存
socketio = SocketIO()#websocket对象

2、ext/config.py

#跨域的配置
cors_config = {"origins": "*", #所有域都允许"expose_headers": ["Content-Type", "token","x-requested-with"], #跨域请求头允许的"methods": ["GET", "POST","PUT","PATCH","OPTIONS","DELETE"], #跨域允许的请求方式"supports_credentials": True, #允许在cookies跨域
}#cache缓存配置-使用redis数据库
cache_config_redis = {'CACHE_TYPE':'redis','CACHE_REDIS_HOST':'127.0.0.1','CACHE_REDIS_PORT':6637,# 'CACHE_REDIS_PASSWORD':'密码', #如果redis配置了密码'CACHE_REDIS_DB':0, #指定使用的redis的db,默认0-15
}#cache缓存配置-使用内存
cache_config_mem = {'CACHE_TYPE':'simple'#使用内存作为cache
}

3.2、apps配置

1、__init__.py

import logging
import eventlet
from flask import Flask,jsonify#导入跨域对象
from ext import cors
#导入db对象
from ext import db
#导入cache
from ext import cache
#导入socketio
from ext import socketio
#导入拓展的配置内容
from ext.config import cache_config_redis,cache_config_mem,cors_config#导入中间件
from base.middleware import after_request  #响应前执行的中间件#导入配置字典
from base.settings import envs#导入日志
from base.logger import getLogHandlerFile
from base.logger import getLogHanderTime#定时任务
from base.scheduler import SchedulerManage#注册socketio的命名空间
from apps.websocket.consumers import TotalWebsocketNamespace#导入蓝图
from apps.user.urls import user_bpdef create_app():#创建一个flask实例,传递__name__ ,是把当前文路径作为flask实例的根路径#static和templates都是创建在该路径下的app = Flask(__name__,static_folder='../static',template_folder='../templates') #static目录位置是上层的staticeventlet.monkey_patch()  # 开启补丁机制'基本配置'#导入配置从类中app.config.from_object(envs.get('default'))'日志配置'app.logger.addHandler(getLogHanderTime()) #基于时间app.logger.addHandler(getLogHandlerFile()) #基于文件大小app.logger.setLevel(logging.INFO)'中间件'#每次响应前都先设置好响应头,做好跨域app.after_request(after_request) #【1、使用中间件解决跨域】#执行请求处理前的中间件,# app.before_request(before_request)'拓展配置'#配置db对象 将db对象与app进行绑定,orm对象与app绑定db.init_app(app)#配置跨域,supports_credentials=True 允许携带cookies等信息cors.init_app(app,**cors_config) #【2、使用三方扩展解决跨域】#配置缓存cache.init_app(app=app,config=cache_config_mem)#配置socketiosocketio.init_app(app,cors_allowed_origins='*')'socketio注册命名空间的位置: 必须在这个py文件中注册'socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间'蓝图注册'app.register_blueprint(user_bp)'异常处理'@app.errorhandler(Exception)def handle_exception(e):# 将异常错误写到日志文件中app.logger.exception(str(e))# print(e, type(e))# 对异常错误的响应,使用api的格式return jsonify(code=500, message=str(e)), 500'定时任务'# SchedulerManage()return app

在本文中,最重要的就是将socketio命名空间的注册,放到create_app函数中了。

2、websocket/consumers.py

from flask import  render_template, request, jsonify
from flask_socketio import SocketIO, send, emit, join_room
from ext import socketio
from flask_socketio import Namespace
'''
一、非群聊功能,前端需要实时更新某些数据使用
1、返回html页面
2、主动发送websocket到后端,后端返回数据给请求的用户
3、调用某个视图函数,在视图函数中,给所有连接推送新的数据
'''class TotalWebsocketNamespace(Namespace):def on_handle_data(self,data):print(data, '接收浏览器发送的数据')# 1、给发送给后端的websocket,发送数据,单独给这个websocket发送# socketio.emit('handle_data', {'data':'返回的数据','type':'user','msg':'单独返回'})emit('handle_data', {'data': '返回的数据', 'type': 'user', 'msg': '主动请求时,返回的数据'})def on_connect(self):print('connect连接')token = request.args.get('token')sid = request.sidprint(request.args, 'args')# print('连接的sid',request.sid)if token == '123456':socketio.emit('success', '验证token成功')join_room('default')  # 加入到默认的房间中了# 表明连接成功else:print('token验证失败')# 阻止连接return False

四、总结

如何将socketio处理函数集中管理,将所有的处理类集中到一个py文件中。

1、创建一个包,在包中创建一个py文件,将所有处理socketio的类都写在这里

2、在创建flask的app应用中,注册socketio的命名空间:在create_app函数中,注册命名空间

'socketio注册命名空间的位置: 必须在这个py文件中注册'
socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间

建议采用这种方式来管理socketio,在后续的维护会比较容易,拓展起来也简单。不建议使用函数的方式,这样就必须将函数都写在app.py文件中,不然后端无法监听到前端发送的websocket的请求。

五、错误写法

1、直接在consumers.py同级目录下创建一个routings.py,将命名空间的注册写到该py文件中,这样后端是无法接收到前端的websocket请求的。

2、使用函数的方式来处理逻辑,且把函数都集中放到了websocket/consumers.py文件中,这样后端是无法接收到前端的websocket请求的。

http://www.lryc.cn/news/190725.html

相关文章:

  • Vue的学习补充
  • vue移动端H5调起手机发送短信(兼容ios和android)
  • spring boot RabbitMq基础教程
  • springboot vue 部署至Rocky(Centos)并自启,本文部署是若依应用
  • Mysql之增删改查
  • 【考研数学】矩阵三大关系的梳理和讨论 | 等价、相似、合同
  • 在 Amazon SageMaker 上使用 ESMFold 语言模型加速蛋白质结构预测
  • node.js知识系列(4)-每天了解一点
  • can not remove .unionfs
  • 微服务 BFF 架构设计
  • 零基础学python之元组
  • 11. SpringBoot项目中参数获取与响应
  • 4+1视图与UML
  • 没用的知识增加了,尝试用文心实现褒义词贬义词快速分类
  • AWS SAP-C02教程3--网络资源
  • 【TensorFlow2 之012】TF2.0 中的 TF 迁移学习
  • mysql面试题46:MySQL中datetime和timestamp的区别
  • 【Spring Boot】RabbitMQ消息队列 — RabbitMQ入门
  • Navicat定时任务
  • 小白必备:简单几步, 使用Cpolar+Emlog在Ubuntu上搭建个人博客网站
  • 封装 Token
  • CloudCompare 二次开发(17)——点云添加均匀分布的随机噪声
  • 研发必会-异步编程利器之CompletableFuture(含源码 中)
  • 上海亚商投顾:沪指高开高走 锂电等新能源赛道大反攻
  • 力扣第235题 二又搜索树的最近公共祖先 c++
  • 时代风口中的Web3.0基建平台,重新定义Web3.0!
  • React学习笔记 001
  • 2023 | github无法访问或速度慢的问题解决方案
  • unity各种插件集合(自用)
  • 内网收集哈希传递