当前位置: 首页 > news >正文

进阶向:基于Python的局域网聊天工具(端对端加密)

基于Python的局域网聊天工具(端对端加密)——从零开始实现

在现代互联网环境中,隐私和安全越来越受到重视。端对端加密(End-to-End Encryption, E2EE)技术可以确保只有通信的双方能够读取消息内容,即使是服务器也无法解密。本文将详细介绍如何用Python实现一个简单的局域网聊天工具,并为其添加端对端加密功能。

什么是端对端加密?

端对端加密是一种通信加密方式,消息在发送端加密后,只有接收端能够解密。中间的任何节点(如路由器、服务器等)都只能看到加密后的数据,无法获取原始消息内容。这种加密方式广泛应用于即时通讯工具中,如WhatsApp、Signal等。

项目概述

本项目将实现以下功能:

  • 基于Python的局域网聊天工具
  • 支持多客户端连接
  • 使用非对称加密(RSA)进行密钥交换
  • 使用对称加密(AES)加密通信内容
  • 简单的命令行界面
技术栈
  • 编程语言:Python 3.x
  • 网络库:socket、threading
  • 加密库:cryptography
  • 其他:argparse(参数解析)

核心模块解析

网络通信基础

局域网聊天工具的核心是网络通信。Python的socket库提供了低级别的网络接口,可以创建TCP或UDP连接。本项目中采用TCP协议,因为它能保证消息的可靠传输。

import socket# 创建TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

TCP通信需要明确的客户端和服务器端。服务器监听特定端口,客户端主动连接服务器。在聊天工具中,每个用户既是客户端(发送消息)又是服务器(接收消息)。

多线程处理

为了实现同时接收和发送消息,需要使用多线程。Python的threading模块可以方便地创建和管理线程。

import threadingdef receive_messages():while True:data = client_socket.recv(1024)print(f"Received: {data.decode()}")# 创建接收消息的线程
receive_thread = threading.Thread(target=receive_messages)
receive_thread.start()

加密实现

加密部分分为两个阶段:密钥交换和消息加密。

  1. 密钥交换:使用RSA非对称加密。每个用户生成自己的RSA密钥对,公开公钥,保存私钥。当两个用户通信时,他们交换公钥,然后用对方的公钥加密一个随机的AES密钥(会话密钥)。

  2. 消息加密:使用AES对称加密。一旦会话密钥安全交换,后续通信都使用这个密钥进行加密解密,因为对称加密比非对称加密效率高很多。

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os# 生成RSA密钥对
private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend()
)
public_key = private_key.public_key()# 生成AES密钥
aes_key = os.urandom(32)  # 256-bit key


详细实现步骤

1. 用户类设计

首先设计一个User类,包含用户的基本信息和加密相关操作。

class User:def __init__(self, name):self.name = nameself.private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend())self.public_key = self.private_key.public_key()self.peer_public_key = Noneself.aes_key = Nonedef serialize_public_key(self):return self.public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo)def deserialize_public_key(self, key_bytes):self.peer_public_key = serialization.load_pem_public_key(key_bytes,backend=default_backend())

2. 密钥交换协议

设计一个简单的协议来交换公钥和会话密钥:

  1. 连接建立后,双方交换RSA公钥
  2. 一方生成AES密钥,用对方的公钥加密后发送
  3. 对方收到后用私钥解密获取AES密钥
  4. 后续通信使用AES加密
def perform_key_exchange(user, conn, is_initiator):# 交换公钥conn.sendall(user.serialize_public_key())peer_key_bytes = conn.recv(4096)user.deserialize_public_key(peer_key_bytes)if is_initiator:# 生成并发送AES密钥user.aes_key = os.urandom(32)encrypted_aes_key = user.peer_public_key.encrypt(user.aes_key,padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),algorithm=hashes.SHA256(),label=None))conn.sendall(encrypted_aes_key)else:# 接收并解密AES密钥encrypted_aes_key = conn.recv(4096)user.aes_key = user.private_key.decrypt(encrypted_aes_key,padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),algorithm=hashes.SHA256(),label=None))

3. 消息加密解密

实现AES加密解密功能:

def encrypt_message(key, message):iv = os.urandom(16)  # 初始向量cipher = Cipher(algorithms.AES(key),modes.CFB(iv),backend=default_backend())encryptor = cipher.encryptor()ciphertext = encryptor.update(message.encode()) + encryptor.finalize()return iv + ciphertextdef decrypt_message(key, ciphertext):iv = ciphertext[:16]cipher = Cipher(algorithms.AES(key),modes.CFB(iv),backend=default_backend())decryptor = cipher.decryptor()plaintext = decryptor.update(ciphertext[16:]) + decryptor.finalize()return plaintext.decode()

4. 服务器和客户端实现

将上述功能整合到服务器和客户端代码中:

def start_server(port):user = User("Server")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.bind(('0.0.0.0', port))s.listen()conn, addr = s.accept()perform_key_exchange(user, conn, False)# 启动接收消息线程def receive():while True:data = conn.recv(4096)if not data: breakmessage = decrypt_message(user.aes_key, data)print(f"Received: {message}")threading.Thread(target=receive, daemon=True).start()# 发送消息while True:message = input("> ")encrypted = encrypt_message(user.aes_key, message)conn.sendall(encrypted)def start_client(host, port):user = User("Client")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.connect((host, port))perform_key_exchange(user, s, True)# 启动接收消息线程def receive():while True:data = s.recv(4096)if not data: breakmessage = decrypt_message(user.aes_key, data)print(f"Received: {message}")threading.Thread(target=receive, daemon=True).start()# 发送消息while True:message = input("> ")encrypted = encrypt_message(user.aes_key, message)s.sendall(encrypted)


完整源代码

import socket
import threading
import argparse
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import osclass User:def __init__(self, name):self.name = nameself.private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend())self.public_key = self.private_key.public_key()self.peer_public_key = Noneself.aes_key = Nonedef serialize_public_key(self):return self.public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo)def deserialize_public_key(self, key_bytes):self.peer_public_key = serialization.load_pem_public_key(key_bytes,backend=default_backend())def encrypt_message(key, message):iv = os.urandom(16)cipher = Cipher(algorithms.AES(key),modes.CFB(iv),backend=default_backend())encryptor = cipher.encryptor()ciphertext = encryptor.update(message.encode()) + encryptor.finalize()return iv + ciphertextdef decrypt_message(key, ciphertext):iv = ciphertext[:16]cipher = Cipher(algorithms.AES(key),modes.CFB(iv),backend=default_backend())decryptor = cipher.decryptor()plaintext = decryptor.update(ciphertext[16:]) + decryptor.finalize()return plaintext.decode()def perform_key_exchange(user, conn, is_initiator):conn.sendall(user.serialize_public_key())peer_key_bytes = conn.recv(4096)user.deserialize_public_key(peer_key_bytes)if is_initiator:user.aes_key = os.urandom(32)encrypted_aes_key = user.peer_public_key.encrypt(user.aes_key,padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),algorithm=hashes.SHA256(),label=None))conn.sendall(encrypted_aes_key)else:encrypted_aes_key = conn.recv(4096)user.aes_key = user.private_key.decrypt(encrypted_aes_key,padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),algorithm=hashes.SHA256(),label=None))def start_server(port):user = User("Server")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.bind(('0.0.0.0', port))s.listen()print(f"Server listening on port {port}")conn, addr = s.accept()print(f"Connected by {addr}")perform_key_exchange(user, conn, False)def receive():while True:data = conn.recv(4096)if not data: breakmessage = decrypt_message(user.aes_key, data)print(f"Received: {message}")threading.Thread(target=receive, daemon=True).start()while True:message = input("> ")encrypted = encrypt_message(user.aes_key, message)conn.sendall(encrypted)def start_client(host, port):user = User("Client")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.connect((host, port))perform_key_exchange(user, s, True)def receive():while True:data = s.recv(4096)if not data: breakmessage = decrypt_message(user.aes_key, data)print(f"Received: {message}")threading.Thread(target=receive, daemon=True).start()while True:message = input("> ")encrypted = encrypt_message(user.aes_key, message)s.sendall(encrypted)if __name__ == "__main__":parser = argparse.ArgumentParser(description="Secure LAN Chat")parser.add_argument("-s", "--server", action="store_true", help="Run as server")parser.add_argument("-c", "--client", action="store_true", help="Run as client")parser.add_argument("--host", type=str, default="localhost", help="Server host")parser.add_argument("--port", type=int, default=12345, help="Port number")args = parser.parse_args()if args.server:start_server(args.port)elif args.client:start_client(args.host, args.port)else:print("Please specify --server or --client")


使用说明

  1. 在一台机器上运行服务器:

    python chat.py --server --port 12345
    

  2. 在另一台机器上运行客户端(确保在同一局域网):

    python chat.py --client --host <服务器IP> --port 12345
    

  3. 开始聊天,输入的消息会自动加密传输


安全注意事项

  1. 本项目仅用于学习目的,不应用于生产环境
  2. 实际应用中需要更完善的密钥管理和身份验证机制
  3. 加密实现使用了cryptography库,这是Python中比较可靠的加密库
  4. 确保Python环境是最新版本,避免已知的安全漏洞

通过这个项目,您可以学习到网络编程、多线程、加密算法等多项技术。希望这篇教程能帮助您理解端对端加密的基本原理和实现方法。

http://www.lryc.cn/news/599314.html

相关文章:

  • Amazon Bedrock中的Stability AI文本转图像模型:技术原理、应用实践与未来趋势
  • 创始人IP:知识变现的核心资产
  • RAG实战指南 Day 24:上下文构建与提示工程
  • winform表格DataGridView多个单元格批量输入数字
  • 瑞萨电子RA-T MCU系列新成员RA2T1——电机控制专家
  • MySQL性能优化配置终极指南
  • 详谈OSI七层模型和TCP/IP四层模型以及tcp与udp为什么是4层,http与https为什么是7层
  • Kotlin 数据容器 - List(List 概述、创建 List、List 核心特性、List 元素访问、List 遍历)
  • STM32与ADS1220实现多通道数据采集的完整分析和源程序
  • 【WPS】office邮件合并,怎么将数据源excel中的下一条拼接在文档中的下一个位置
  • 目标导向的强化学习:问题定义与 HER 算法详解—强化学习(19)
  • Android Kotlin 协程全面指南
  • C++ : list的模拟
  • 【数据结构】长幼有序:树、二叉树、堆与TOP-K问题的层次解析(含源码)
  • 安全风险监测平台:被动应对向主动预防的转变
  • Nginx 安装与 HTTPS 配置指南:使用 OpenSSL 搭建安全 Web 服务器
  • 【IDEA】idea怎么修改注册的用户名称?
  • OAuth 2.0 安全最佳实践 (RFC 9700) password 授权类型已经不推荐使用了,将在计划中移除
  • 关于新学C++编程Visual Studio 2022开始,使用Cmake工具构建Opencv和SDK在VS里编译项目开发简介笔记
  • 《汇编语言:基于X86处理器》第9章 编程练习
  • vscode 登录ssh记住密码直接登录设置
  • vscode 字体的跟换
  • Web前端:JavaScript Math选字游戏 斯特鲁普效应测试
  • 短剧广告变现系统全栈开发指南:从架构设计到高并发实践
  • 动态规划Day1学习心得
  • RocketMQ常见问题梳理
  • kafka如何保证数据不丢失
  • 2025年7月25日训练日志
  • Elasticsearch-8.17.0 centos7安装
  • Flink 自定义类加载器和子优先类加载策略