当前位置: 首页 > news >正文

物联网数据安全区块链服务

物联网数据安全区块链服务

下面是一个专为物联网数据安全设计的区块链服务实现,使用Python编写并封装为RESTful API。该服务确保物联网设备数据的不可篡改性、可追溯性和安全性。

import hashlib
import json
import time
from datetime import datetime
from uuid import uuid4
from flask import Flask, jsonify, request, Response
import requests
from urllib.parse import urlparse
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import sqlite3
import threading# --------------------- 区块链核心实现 ---------------------
class IoTBlockchain:def __init__(self):self.chain = []self.pending_transactions = []self.nodes = set()self.device_keys = {}  # 存储设备公钥 {device_id: public_key}# 创建创世区块self.create_genesis_block()# 创建数据库连接self.db_conn = sqlite3.connect('iot_blockchain.db', check_same_thread=False)self.init_db()# 自动清理线程self.cleanup_thread = threading.Thread(target=self.periodic_cleanup, daemon=True)self.cleanup_thread.start()def init_db(self):"""初始化数据库"""cursor = self.db_conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS device_data (data_id TEXT PRIMARY KEY,device_id TEXT,data_hash TEXT,timestamp REAL,block_index INTEGER,public_key TEXT)''')cursor.execute('''CREATE TABLE IF NOT EXISTS revoked_tokens (token TEXT PRIMARY KEY,revocation_time REAL)''')self.db_conn.commit()def periodic_cleanup(self):"""定期清理旧数据"""while True:time.sleep(3600)  # 每小时清理一次try:cursor = self.db_conn.cursor()# 保留最近48小时的数据cutoff = time.time() - 48 * 3600cursor.execute("DELETE FROM revoked_tokens WHERE revocation_time < ?", (cutoff,))self.db_conn.commit()except Exception as e:print(f"清理错误: {e}")def create_genesis_block(self):"""创建创世区块"""genesis_block = {'index': 0,'timestamp': time.time(),'transactions': [],'proof': 100,'previous_hash': '0','merkle_root': '0'}self.chain.append(genesis_block)def register_device(self, device_id, public_key):"""注册物联网设备"""if device_id in self.device_keys:return Falseself.device_keys[device_id] = public_keyreturn Truedef verify_signature(self, device_id, data, signature):"""验证设备签名"""if device_id not in self.device_keys:return Falsepublic_key = self.device_keys[device_id]try:# 在实际应用中应使用更安全的验证方法# 这里简化实现pub_key = serialization.load_pem_public_key(public_key.encode(),backend=default_backend())# 在实际应用中应使用pub_key.verify()方法# 这里简化验证过程calculated_hash = self.hash_data(data)return calculated_hash == signatureexcept:return Falsedef hash_data(self, data):"""计算数据的哈希值"""if isinstance(data, dict):data_str = json.dumps(data, sort_keys=True)else:data_str = str(data)return hashlib.sha256(data_str.encode()).hexdigest()def create_merkle_root(self, transactions):"""创建Merkle树根哈希"""if not transactions:return "0"# 计算所有交易的哈希hashes = [self.hash_data(tx) for tx in transactions]while len(hashes) > 1:new_hashes = []# 两两配对计算哈希for i in range(0, len(hashes), 2):if i + 1 < len(hashes):combined = hashes[i] + hashes[i + 1]else:combined = hashes[i] + hashes[i]  # 奇数个时复制最后一个new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())hashes = new_hashesreturn hashes[0]def new_transaction(self, device_id, data, signature):"""创建新的物联网数据交易"""# 验证签名if not self.verify_signature(device_id, data, signature):return None# 生成唯一数据IDdata_id = str(uuid4())# 存储到数据库(在实际应用中应使用更安全的存储)cursor = self.db_conn.cursor()cursor.execute("INSERT INTO device_data (data_id, device_id, data_hash, timestamp, public_key) VALUES (?, ?, ?, ?, ?)",(data_id, device_id, self.hash_data(data), time.time(), self.device_keys[device_id])self.db_conn.commit()# 添加到待处理交易transaction = {'data_id': data_id,'device_id': device_id,'data_hash': self.hash_data(data),'timestamp': time.time(),'signature': signature}self.pending_transactions.append(transaction)return data_iddef mine(self):"""挖矿创建新区块"""if not self.pending_transactions:return Nonelast_block = self.last_blocklast_proof = last_block['proof']proof = self.proof_of_work(last_proof)# 创建Merkle根merkle_root = self.create_merkle_root(self.pending_transactions)# 创建新区块block = {'index': len(self.chain),'timestamp': time.time(),'transactions': self.pending_transactions,'proof': proof,'previous_hash': self.hash_block(last_block),'merkle_root': merkle_root}# 重置待处理交易self.pending_transactions = []# 添加到区块链self.chain.append(block)return blockdef proof_of_work(self, last_proof):"""简单的工作量证明算法"""proof = 0while not self.valid_proof(last_proof, proof):proof += 1return proofdef valid_proof(self, last_proof, proof):"""验证工作量证明"""guess = f'{last_proof}{proof}'.encode()guess_hash = hashlib.sha256(guess).hexdigest()# 调整难度:要求前3位为0(可根据需求调整)return guess_hash[:3] == "000"def hash_block(self, block):"""计算区块的哈希值"""block_string = json.dumps(block, sort_keys=True).encode()return hashlib.sha256(block_string).hexdigest()@propertydef last_block(self):"""获取最后一个区块"""return self.chain[-1]def validate_chain(self):"""验证区块链的完整性"""for i in range(1, len(self.chain)):current_block = self.chain[i]previous_block = self.chain[i-1]# 检查区块哈希是否正确if current_block['previous_hash'] != self.hash_block(previous_block):return False# 检查工作量证明if not self.valid_proof(previous_block['proof'], current_block['proof']):return False# 验证Merkle根calculated_merkle = self.create_merkle_root(current_block['transactions'])if calculated_merkle != current_block['merkle_root']:return False# 验证交易签名for tx in current_block['transactions']:if tx['device_id'] not in self.device_keys:return False# 在实际应用中应使用公钥验证签名# 这里简化验证过程stored_data = self.get_data_by_id(tx['data_id'])if not stored_data or stored_data['data_hash'] != tx['data_hash']:return Falsereturn Truedef get_data_by_id(self, data_id):"""根据ID获取数据记录"""cursor = self.db_conn.cursor()cursor.execute("SELECT * FROM device_data WHERE data_id = ?", (data_id,))row = cursor.fetchone()if row:return {'data_id': row[0],'device_id': row[1],'data_hash': row[2],'timestamp': row[3],'block_index': row[4],'public_key': row[5]}return Nonedef revoke_token(self, token):"""撤销JWT令牌"""cursor = self.db_conn.cursor()cursor.execute("INSERT OR REPLACE INTO revoked_tokens (token, revocation_time) VALUES (?, ?)",(token, time.time()))self.db_conn.commit()return Truedef is_token_revoked(self, token):"""检查令牌是否已被撤销"""cursor = self.db_conn.cursor()cursor.execute("SELECT token FROM revoked_tokens WHERE token = ?", (token,))return cursor.fetchone() is not None# --------------------- Flask应用和API ---------------------
app = Flask(__name__)
blockchain = IoTBlockchain()# 生成RSA密钥对用于API认证
private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend()
)
public_key = private_key.public_key()# 序列化密钥
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,format=serialization.PrivateFormat.PKCS8,encryption_algorithm=serialization.NoEncryption()
).decode()public_pem = public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()# API密钥(仅用于演示)
API_KEYS = {"admin": "supersecretkey123","device_manager": "devicekey456"
}# JWT认证装饰器
def jwt_required(f):def decorated_function(*args, **kwargs):token = Noneif 'Authorization' in request.headers:token = request.headers['Authorization'].split(" ")[1]if not token:return jsonify({'message': 'Token is missing'}), 401if blockchain.is_token_revoked(token):return jsonify({'message': 'Token has been revoked'}), 401try:# 在实际应用中应验证签名# 这里简化验证过程data = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})request.current_user = data['sub']except:return jsonify({'message': 'Token is invalid'}), 401return f(*args, **kwargs)return decorated_function@app.route('/api/auth', methods=['POST'])
def authenticate():"""获取JWT令牌"""auth_data = request.get_json()if not auth_data or 'api_key' not in auth_data:return jsonify({'message': 'Missing API key'}), 400api_key = auth_data['api_key']if api_key not in API_KEYS.values():return jsonify({'message': 'Invalid API key'}), 401# 确定用户角色role = "user"for k, v in API_KEYS.items():if v == api_key:role = kbreak# 生成JWT令牌(在实际应用中应设置合理过期时间)token = jwt.encode({'sub': role,'iat': datetime.utcnow(),# 'exp': datetime.utcnow() + timedelta(hours=1)  # 设置过期时间}, 'secret', algorithm='HS256')  # 在实际应用中应使用更安全的密钥return jsonify({'token': token})@app.route('/api/devices/register', methods=['POST'])
@jwt_required
def register_device():"""注册物联网设备"""if request.current_user != 'admin':return jsonify({'message': 'Unauthorized'}), 403data = request.get_json()if not data or 'device_id' not in data or 'public_key' not in data:return jsonify({'message': 'Missing device ID or public key'}), 400device_id = data['device_id']public_key = data['public_key']if blockchain.register_device(device_id, public_key):return jsonify({'message': f'Device {device_id} registered successfully'}), 201else:return jsonify({'message': f'Device {device_id} already registered'}), 400@app.route('/api/data/submit', methods=['POST'])
@jwt_required
def submit_data():"""提交物联网数据"""data = request.get_json()if not data or 'device_id' not in data or 'data' not in data or 'signature' not in data:return jsonify({'message': 'Missing required fields'}), 400device_id = data['device_id']sensor_data = data['data']signature = data['signature']data_id = blockchain.new_transaction(device_id, sensor_data, signature)if data_id:return jsonify({'message': 'Data submitted successfully','data_id': data_id}), 201else:return jsonify({'message': 'Invalid device signature'}), 400@app.route('/api/chain/mine', methods=['POST'])
@jwt_required
def mine_block():"""挖矿创建新区块"""if request.current_user != 'admin':return jsonify({'message': 'Unauthorized'}), 403block = blockchain.mine()if block:return jsonify({'message': 'New block mined','block': block}), 201else:return jsonify({'message': 'No transactions to mine'}), 400@app.route('/api/chain', methods=['GET'])
@jwt_required
def get_full_chain():"""获取完整区块链"""return jsonify({'chain': blockchain.chain,'length': len(blockchain.chain)}), 200@app.route('/api/data/<data_id>', methods=['GET'])
@jwt_required
def get_data(data_id):"""根据ID获取数据记录"""data_record = blockchain.get_data_by_id(data_id)if data_record:# 在实际应用中应返回更多信息return jsonify({'data_id': data_record['data_id'],'device_id': data_record['device_id'],'timestamp': data_record['timestamp'],'block_index': data_record['block_index']}), 200else:return jsonify({'message': 'Data not found'}), 404@app.route('/api/validate', methods=['GET'])
@jwt_required
def validate_chain():"""验证区块链完整性"""is_valid = blockchain.validate_chain()if is_valid:return jsonify({'message': 'Blockchain is valid'}), 200else:return jsonify({'message': 'Blockchain is invalid'}), 400@app.route('/api/auth/revoke', methods=['POST'])
@jwt_required
def revoke_token():"""撤销当前令牌"""token = request.headers['Authorization'].split(" ")[1]if blockchain.revoke_token(token):return jsonify({'message': 'Token revoked successfully'}), 200else:return jsonify({'message': 'Failed to revoke token'}), 400@app.route('/api/public_key', methods=['GET'])
def get_public_key():"""获取API公钥(用于客户端验证)"""return Response(public_pem, mimetype='text/plain')# --------------------- 设备模拟器 ---------------------
class IoTDeviceSimulator:def __init__(self, device_id):self.device_id = device_id# 生成设备密钥对self.private_key = rsa.generate_private_key(public_exponent=65537,key_size=2048,backend=default_backend())self.public_key = self.private_key.public_key().public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo).decode()def sign_data(self, data):"""签名数据(简化实现)"""# 在实际应用中应使用私钥签名# 这里返回数据的哈希作为简化签名if isinstance(data, dict):data_str = json.dumps(data, sort_keys=True)else:data_str = str(data)return hashlib.sha256(data_str.encode()).hexdigest()def generate_data(self):"""生成模拟传感器数据"""return {'temperature': round(20 + 10 * (time.time() % 1), 2),'humidity': round(40 + 30 * ((time.time() + 0.3) % 1), 2),'pressure': round(980 + 40 * ((time.time() + 0.7) % 1), 2),'timestamp': time.time()}# --------------------- 主程序 ---------------------
if __name__ == '__main__':import argparseparser = argparse.ArgumentParser(description='IoT Blockchain API Server')parser.add_argument('-p', '--port', type=int, default=5000, help='Port to run the server on')parser.add_argument('-d', '--debug', action='store_true', help='Run in debug mode')args = parser.parse_args()# 注册一个模拟设备sim_device = IoTDeviceSimulator("sensor-001")blockchain.register_device(sim_device.device_id, sim_device.public_key)# 启动Flask应用app.run(host='0.0.0.0', port=args.port, debug=args.debug)

功能说明

1. 核心区块链功能

  • 设备注册:物联网设备使用公钥注册到系统
  • 数据提交:设备提交带签名的传感器数据
  • 区块挖矿:将待处理数据打包进新区块
  • Merkle树:用于高效验证区块中的交易
  • 工作量证明:简单的PoW共识机制

2. 安全特性

  • 设备身份验证:每个设备使用公私钥对进行身份验证
  • 数据签名:设备对提交的数据进行签名
  • API认证:使用JWT令牌保护API端点
  • 令牌撤销:支持撤销已发放的JWT令牌

3. 数据管理

  • SQLite数据库:存储设备数据和撤销令牌
  • 数据检索:通过数据ID查询数据记录
  • 自动清理:定期清理旧数据

4. RESTful API 端点

端点方法描述认证
/api/authPOST获取JWT令牌API密钥
/api/devices/registerPOST注册新设备admin令牌
/api/data/submitPOST提交传感器数据有效JWT
/api/chain/minePOST挖矿创建新区块admin令牌
/api/chainGET获取完整区块链有效JWT
/api/data/<data_id>GET获取数据记录有效JWT
/api/validateGET验证区块链完整性有效JWT
/api/auth/revokePOST撤销当前令牌有效JWT
/api/public_keyGET获取API公钥

部署和使用指南

1. 启动服务

python iot_blockchain.py

2. 获取API令牌

curl -X POST http://localhost:5000/api/auth \-H "Content-Type: application/json" \-d '{"api_key": "supersecretkey123"}'

3. 注册物联网设备

curl -X POST http://localhost:5000/api/devices/register \-H "Authorization: Bearer <JWT_TOKEN>" \-H "Content-Type: application/json" \-d '{"device_id": "sensor-002", "public_key": "-----BEGIN PUBLIC KEY-----\n..."}'

4. 提交传感器数据

curl -X POST http://localhost:5000/api/data/submit \-H "Authorization: Bearer <JWT_TOKEN>" \-H "Content-Type: application/json" \-d '{"device_id": "sensor-001","data": {"temperature": 25.5, "humidity": 60},"signature": "<DATA_SIGNATURE>"}'

5. 挖矿创建新区块

curl -X POST http://localhost:5000/api/chain/mine \-H "Authorization: Bearer <JWT_TOKEN>"

6. 查看区块链

curl http://localhost:5000/api/chain \-H "Authorization: Bearer <JWT_TOKEN>"

系统架构图

加密数据
物联网设备
区块链网关
区块链API
区块链核心
区块数据
设备注册
交易池
数据库
设备数据
撤销令牌
认证服务

实际应用建议

  1. 增强安全性

    • 使用硬件安全模块(HSM)管理密钥
    • 实现真正的数字签名验证
    • 添加传输层加密(HTTPS)
  2. 性能优化

    • 使用更高效的共识算法(如PoS)
    • 实现分片技术处理大量设备
    • 使用分布式数据库(如Cassandra)
  3. 扩展功能

    • 添加设备管理面板
    • 实现数据分析和告警功能
    • 支持设备固件验证
  4. 存储优化

    • 使用IPFS存储大型传感器数据
    • 实现数据压缩和聚合
    • 添加时间序列数据库支持
  5. 隐私保护

    • 实现零知识证明验证
    • 添加数据脱敏功能
    • 支持差分隐私

这个区块链服务为物联网数据提供了强大的安全保障,确保数据的完整性和不可篡改性,同时通过API接口提供了灵活的集成方式。

http://www.lryc.cn/news/580154.html

相关文章:

  • DeepSeek-R1知识蒸馏和微调实践(一)源码
  • 使用 C# 发送电子邮件(支持普通文本、HTML 和附件)
  • BEVFormer模型处理流程
  • 佰力博科技与您探讨表面电阻的测试方法及应用领域
  • Java程序员短时间内如何精通Redis?
  • 基于大模型的强直性脊柱炎全周期预测与诊疗方案研究
  • Spring Boot + 本地部署大模型实现:安全性与可靠性保障
  • 基于Linux的Spark本地模式环境搭建实验指南
  • RabbitMQ 4.1.1初体验
  • Ubuntu Linux Cursor 安装与使用一
  • Web前端数据可视化:ECharts高效数据展示完全指南
  • 【C#】入门
  • Linux三剑客:grep、sed、awk 详解以及find区别
  • 大语言模型预训练数据——数据采样方法介绍以GPT3为例
  • 基于Apache MINA SSHD配置及应用
  • CppCon 2018 学习:OOP is dead, long live Data-oriented design
  • ABP VNext + RediSearch:微服务级全文检索
  • PyCharm 安装使用教程
  • Rust异步爬虫实现与优化
  • 全星 QMS:制造业全面质量管理的数字化全能平台
  • 鸿蒙系统(HarmonyOS)应用开发之手势锁屏密码锁(PatternLock)
  • Jenkins-Publish HTML reports插件
  • 接口测试之postman
  • ZigBee通信技术全解析:从协议栈到底层实现,全方位解读物联网核心无线技术
  • 区块链技术核心组件及应用架构的全面解析
  • 7.4_面试_JAVA_
  • 【PyTorch】PyTorch预训练模型缓存位置迁移,也可拓展应用于其他文件的迁移
  • 基于PHP+MySQL实现(Web)英语学习与测试平台
  • 408第三季part2 - 计算机网络 - 计算机网络基本概念
  • 金融平衡术:创新与合规的突围之路