当前位置: 首页 > news >正文

Python 封装 socket 为 [TCP/UDP/MULTICAST] 服务端

在新线程中创建 TCP/UDP/MULTICAST 协议的服务端套接字,接收客户端的连接请求或数据,并调用 on_recv 回调函数处理数据。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import threading
import multiprocessingclass ServerSocket:def __init__(self, *, protocol: str, port: int, group: str = '', on_recv: 'function'):"""服务端套接字在新线程中创建 TCP/UDP/MULTICAST 协议的服务端套接字,接收客户端的连接请求或数据,并调用 on_recv 回调函数处理数据。TCP 断开连接的情况:- TCP 正常断开+ 客户端主动断开连接+ 通信期间正常交换数据 (若服务端返回了响应, 则客户端应该接收响应)- TCP 连接已重置+ 客户端主动断开连接+ 服务端返回了响应,但客户端未接收- TCP 连接已终止+ 未通信完毕就已经断开了连接Args:protocol (str): 协议port (int): 端口号group (str, optional): 组播地址. Defaults to ''.on_recv (function, optional): 接收到数据时的回调函数, 参数为 (data: bytes, client_name: str). Defaults to None.Raises:ValueError: 无效的端口号, 应为 [1-65535]ValueError: 无效的协议类型, 应为 [TCP, UDP, MULTICAST]"""if port < 1 or port > 65535:raise ValueError(f'ServerSocket 无效的端口号 "{port}"')if protocol not in ['TCP', 'UDP', 'MULTICAST']:raise ValueError(f'ServerSocket 无效的协议类型 "{protocol}"')if protocol == 'MULTICAST' and not group:raise ValueError(f'ServerSocket 组播协议必须指定组播地址')if protocol != 'MULTICAST' and group:raise ValueError(f'ServerSocket 协议类型 "{protocol}" 请勿设置 group 参数')self.protocol = protocolself.port = portself.group = groupself.on_recv = on_recvself.sock: socket.socket | None = Noneself.tcp_sub_socks: list[socket.socket] = []self.thread: threading.Thread | None = Noneself.__active = Falsedef __str__(self) -> str:if self.protocol == 'MULTICAST':return f'ServerSocket({self.protocol}, {self.group}:{self.port})'return f'ServerSocket({self.protocol}, {self.port})'def __del__(self) -> None:self.close()def __create_socket(self) -> None:match self.protocol:case 'TCP':self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.bind(('0.0.0.0', self.port))self.sock.listen(10)case 'UDP':self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.sock.bind(('0.0.0.0', self.port))case 'MULTICAST':self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.sock.bind(('0.0.0.0', self.port))self.sock.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,socket.inet_aton(self.group) + socket.INADDR_ANY.to_bytes(4, byteorder='big'))def __send_back(self, client_addr: tuple[str, int], client_sock: socket.socket | None = None) -> 'function':def send_back(data: bytes):if self.protocol == 'TCP':return client_sock.sendto(data, client_addr)return self.sock.sendto(data, client_addr)return send_backdef __tcp_sub_thread(self, client_sock: socket.socket, client_addr: tuple[str, int]) -> None:while self.is_active():try:if not (data := client_sock.recv(1024)):print(f'{self} TCP 子线程 {client_addr} 正常断开')breakself.on_recv and self.on_recv(data=data,client_addr=client_addr,send_back=self.__send_back(client_addr, client_sock),)except ConnectionResetError:print(f'{self} TCP 子线程 {client_addr} 连接已重置')breakexcept ConnectionAbortedError:print(f'{self} TCP 子线程 {client_addr} 连接已终止')breakexcept Exception as e:if self.is_active():print(f'{self} TCP 子线程 {client_addr} 异常: \n{e}')breakif self.is_active(): # 断开或异常self.tcp_sub_socks.remove(client_sock)client_sock.close()def __main_thread(self) -> None:self.__active = Truewhile self.is_active():try:if self.protocol == 'TCP':client_sock, client_addr = self.sock.accept()self.tcp_sub_socks.append(client_sock)threading.Thread(target=self.__tcp_sub_thread, args=(client_sock, client_addr), daemon=True).start()else:data, client_addr = self.sock.recvfrom(1024)self.on_recv and self.on_recv(data=data,client_addr=client_addr,send_back=self.__send_back(client_addr),)except Exception as e:if self.is_active():print(f'{self} 主线程异常 : \n{e}')breakdef start(self, is_process: bool = False) -> bool:"""启动服务端将在新线程中运行,直到调用 close() 关闭,TCP 协议下会创建子线程处理 TCP 连接Args:is_process (bool, optional): 是否以子进程运行. Defaults to False.Returns:bool: 是否启动成功"""try:self.__create_socket()except Exception as e:print(f'{self} 创建失败: \n{e}')return Falseif is_process:self.thread = multiprocessing.Process(target=self.__main_thread, daemon=True)else:self.thread = threading.Thread(target=self.__main_thread, daemon=True)self.thread.start()return Truedef close(self) -> bool:"""关闭服务端Returns:bool: 是否关闭成功"""try:self.__active = Falseif self.protocol == 'TCP':for client_sock in self.tcp_sub_socks:client_sock.shutdown(socket.SHUT_RDWR)client_sock.close()self.tcp_sub_socks.clear()else:self.sock.shutdown(socket.SHUT_RDWR)self.sock.close()if isinstance(self.thread, multiprocessing.Process):self.thread.terminate()else:self.thread.join()except Exception as e:print(f'{self} 关闭失败: \n{e}')return Falsereturn Truedef is_active(self) -> bool:"""返回服务端是否处于活动状态Returns:bool: 是否处于活动状态"""return self.__activequit = False
def on_recv(data, client_addr, send_back):global quitprint(f'收到数据: {data} 来自 {client_addr}')send_back(b'ok')if data == b'q':quit = True# server.close()if __name__ == '__main__':from time import sleepserver = ServerSocket(protocol='TCP', port=60000, on_recv=on_recv)# server = ServerSocket(protocol='UDP', port=60000, on_recv=on_recv)# server = ServerSocket(protocol='MULTICAST', group='224.1.1.1', port=65000, on_recv=on_recv)server.start()while server.is_active():print('等待数据...')if quit:print('退出')server.close()sleep(60)
http://www.lryc.cn/news/453132.html

相关文章:

  • c++ STL库 unordered_map
  • 【接口测试】任务1:登录接口
  • 二、Spring Boot集成Spring Security之实现原理
  • 基于深度学习的点云处理模型PointNet++学习记录
  • Javascript Object.assgin()详解以及深浅拷贝
  • Redis篇(应用案例 - UV统计)(持续更新迭代)
  • 解锁微信小程序新技能:ECharts动态折线图搭配WebSocket,数据刷新快人一步!
  • 上交所服务器崩溃:金融交易背后的技术隐患暴露杭州BGP高防服务器43.228.71.X
  • P4、P4D、HelixSwarm 各种技术问题咨询
  • Linux 应用层协议HTTP
  • Python和C++混淆矩阵地理学医学物理学视觉语言模型和算法模型评估工具
  • HTTP 协议的基本格式和 fiddler 的用法
  • 【计算机网络】详解UDP协议格式特点缓冲区
  • 网络安全cybersecurity的几个新领域
  • android 原生加载pdf
  • MAE(平均绝对误差)和std(标准差)计算中需要注意的问题
  • 03实战篇:把握667分析题的阅读材料、题目
  • C++系列-多态
  • 基于C++和Python的进程线程CPU使用率监控工具
  • fish-speech语音大模型本地部署
  • 如何写出更牛的验证激励
  • EasyCVR视频汇聚平台:解锁视频监控核心功能,打造高效安全监管体系
  • 面对大文件(300G以上)如何加速上传速度
  • 基于 Redis 实现消息队列的深入解析
  • C++(string类的实现)
  • nrf 24l01使用方法
  • C语言普及难度三题
  • 10.4每日作业
  • 日常工作记录:服务器被攻击导致chattr: command not found
  • 多线程-初阶(1)