计算机视觉(9)-实践中遇到的问题(六路相机模型采集训练部署全流程)
文章背景:
刚开始做项目的时候,没考虑到会有六路相机以及不同相机视频流的大小不一致,导致训练模型和部署是数据流的格式不一致,耽误了我一天的时间去解决这个问题,于是总结了系统的架构以及核心代码部分,用来记录踩坑历史。
系统架构设计
1. 数据采集模块
目标:高效获取六路相机原始数据
import cv2
import threading
import time
import numpy as np
from kafka import KafkaProducerclass MultiCameraCapture:def __init__(self, camera_urls):self.cameras = [cv2.VideoCapture(url) for url in camera_urls]self.producer = KafkaProducer(bootstrap_servers='kafka:9092')self.sync_event = threading.Event()self.timestamps = [0] * 6def _capture_thread(self, cam_id):while True:ret, frame = self.cameras[cam_id].read()if not ret: continue# 硬件级同步 (PTP协议)if cam_id == 0: # 主相机触发同步self.sync_event.set()else:self.sync_event.wait()self.sync_event.clear()# 添加元数据timestamp = time.time_ns()metadata = {'cam_id': cam_id,'timestamp': timestamp,'frame_id': self.cameras[cam_id].get(cv2.CAP_PROP_POS_FRAMES)}# 序列化并发送self.producer.send('raw_frames', key=f'cam_{cam_id}'.encode(),value=self._serialize_frame(frame, metadata))def _serialize_frame(self, frame, meta):# 使用高效压缩ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90])return {'image': buffer.tobytes(),'metadata': meta}def start_capture(self):for i in range(6):threading.Thread(target=self._capture_thread, args=(i,), daemon=True).start()
2. 边缘预处理节点
目标:实时处理原始数据,减轻中心负担
from kafka import KafkaConsumer
import pyarrow as paclass EdgePreprocessor:def __init__(self):self.consumer = KafkaConsumer('raw_frames', group_id='preprocess-group',bootstrap_servers='kafka:9092')self.fs = pa.hdfs.connect()self.buffer = {i: [] for i in range(6)}def process_frame(self, msg):frame_data = msg.valueimg = cv2.imdecode(np.frombuffer(frame_data['image'], cv2.IMREAD_COLOR)meta = frame_data['metadata']# 统一处理流程processed = self._standardize_image(img)# 按相机ID分组缓冲self.buffer[meta['cam_id']].append((processed, meta))# 时间窗口同步 (500ms)if len(self.buffer[0]) > 0 and all(len(self.buffer[i]) > 0 for i in range(1,6)):synced_batch = [self.buffer[i].pop(0) for i in range(6)]self._save_to_data_lake(synced_batch)def _standardize_image(self, img):# 1. 分辨率统一img = cv2.resize(img, (1280, 720))# 2. 色彩空间转换if img.shape[2] == 1: # 灰度图img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)elif img.shape[2] == 4: # RGBAimg = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB)# 3. 设备特定校正img = self._lens_correction(img)return imgdef _save_to_data_lake(self, batch):# 使用Parquet列式存储table = pa.Table.from_arrays([pa.array([item[0] for item in batch]), # 图像数据pa.array([item[1] for item in batch]) # 元数据], names=['image', 'metadata'])# 写入HDFS/S3with self.fs.open(f'/datalake/{time.strftime("%Y%m%d_%H%M%S")}.parquet', 'wb') as f:pq.write_table(table, f)
3. 训练数据管理
目标:高效管理多相机数据集
import tensorflow as tf
from tensorflow.data.experimental import serviceclass TrainingDataManager:def __init__(self):self.dispatching_service = tf.data.experimental.service.DispatchServer()self.worker_service = tf.data.experimental.service.WorkerServer()def create_dataset(self):# 1. 从数据湖加载dataset = tf.data.Dataset.list_files('hdfs:///datalake/*.parquet')# 2. 多相机并行解析dataset = dataset.interleave(lambda f: tf.data.TFRecordDataset(f).map(self._parse_multi_cam, num_parallel_calls=6),cycle_length=6,block_length=1)# 3. 分布式处理dataset = dataset.apply(tf.data.experimental.service.distribute(processing_mode="parallel_epochs",service="grpc://dispatch-service:5000"))return datasetdef _parse_multi_cam(self, example):# 解析六相机数据features = {f'cam_{i}': tf.io.FixedLenFeature([], tf.string) for i in range(6)}parsed = tf.io.parse_single_example(example, features)# 解码并预处理images = {}for i in range(6):img = tf.image.decode_jpeg(parsed[f'cam_{i}'], channels=3)img = self._training_preprocess(img)images[f'cam_{i}'] = imgreturn imagesdef _training_preprocess(self, img):# 训练专用预处理img = tf.image.random_brightness(img, 0.2)img = tf.image.random_contrast(img, 0.8, 1.2)img = tf.image.random_flip_left_right(img)return tf.image.resize(img, (512, 512))
4. 模型训练集群
目标:分布式训练多相机融合模型
import tensorflow as tf
from tensorflow.keras import layers, modelsclass MultiCameraModel(tf.keras.Model):def __init__(self):super().__init__()# 共享特征提取器self.base_model = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet')# 多相机特征融合self.fusion = layers.Concatenate(axis=-1)# 任务特定头部self.task_head = models.Sequential([layers.GlobalAveragePooling2D(),layers.Dense(256, activation='relu'),layers.Dense(10) # 根据任务调整])def call(self, inputs):# 并行处理六路输入features = [self.base_model(inp) for inp in inputs.values()]# 特征融合fused = self.fusion(features)# 任务预测return self.task_head(fused)# 分布式训练配置
strategy = tf.distribute.MultiWorkerMirroredStrategy()with strategy.scope():model = MultiCameraModel()model.compile(optimizer='adam',loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))# 数据并行训练
train_data = TrainingDataManager().create_dataset().batch(32)
model.fit(train_data, epochs=10)# 保存优化后模型
model.save('multi_cam_model.h5')
5. 部署优化模块
目标:优化模型边缘部署
import tensorflow as tf
import tensorrt as trtclass DeploymentOptimizer:def __init__(self, model_path):self.model = tf.keras.models.load_model(model_path)def optimize_for_edge(self):# 1. 量化压缩quantized_model = self._quantize_model()# 2. TensorRT优化trt_model = self._convert_to_trt(quantized_model)# 3. 模型切片 (按相机分离)self._split_model(trt_model)return trt_modeldef _quantize_model(self):converter = tf.lite.TFLiteConverter.from_keras_model(self.model)converter.optimizations = [tf.lite.Optimize.DEFAULT]converter.target_spec.supported_types = [tf.float16]return converter.convert()def _convert_to_trt(self, model):trt_converter = trt.TrtGraphConverter(input_saved_model_dir=model,precision_mode=trt.TrtPrecisionMode.FP16)return trt_converter.convert()def _split_model(self, model):# 分离六相机处理路径for i in range(6):sub_model = tf.keras.Model(inputs=model.input[f'cam_{i}'],outputs=model.get_layer(f'cam_{i}_features').output)sub_model.save(f'cam_{i}_submodel.trt')
6. 边缘推理节点
目标:实时六路视频流推理
import cv2
import tensorflow as tf
import numpy as np
import threadingclass EdgeInferenceNode:def __init__(self):# 加载优化后的模型self.models = {i: tf.saved_model.load(f'cam_{i}_submodel.trt')for i in range(6)}self.fusion_model = tf.saved_model.load('fusion_model.trt')self.buffer = {i: None for i in range(6)}self.lock = threading.Lock()def start_inference(self):# 启动六个相机处理线程for i in range(6):threading.Thread(target=self._process_camera_stream,args=(i,),daemon=True).start()# 启动融合线程threading.Thread(target=self._fusion_thread, daemon=True).start()def _process_camera_stream(self, cam_id):cap = cv2.VideoCapture(cam_id)while True:ret, frame = cap.read()if not ret: continue# 设备端预处理processed = self._edge_preprocess(frame)# 模型推理input_tensor = tf.convert_to_tensor(processed[np.newaxis, ...])features = self.models[cam_id](input_tensor)# 缓冲特征with self.lock:self.buffer[cam_id] = features.numpy()def _fusion_thread(self):while True:# 检查六路特征是否就绪with self.lock:if all(self.buffer[i] is not None for i in range(6)):features = [self.buffer[i] for i in range(6)]# 重置缓冲区self.buffer = {i: None for i in range(6)}if features:# 多特征融合fused_input = np.concatenate(features, axis=-1)result = self.fusion_model(tf.constant(fused_input))# 后处理并输出self._postprocess(result)def _edge_preprocess(self, img):# 边缘设备优化预处理img = cv2.resize(img, (512, 512))img = img.astype(np.float32) / 255.0return imgdef _postprocess(self, result):# 结果解析和输出predictions = tf.nn.softmax(result).numpy()# 发送到CAN总线或显示界面can_bus.send(predictions)
7. 实时监控与反馈
目标:闭环优化系统
class MonitoringSystem:def __init__(self):self.feedback_queue = []self.performance_metrics = {'inference_latency': [],'accuracy': [],'resource_usage': []}def collect_feedback(self):# 收集边缘推理结果while True:result = can_bus.receive()if result:# 存储结果和原始帧self.feedback_queue.append({'prediction': result,'raw_data': self._get_corresponding_frames()})# 性能监控self._update_performance_metrics(result)def trigger_retraining(self):# 当性能下降或新场景出现时触发if self._detect_performance_degradation():self._generate_training_set()# 通知训练集群kafka.send('retrain_request', 'new_data_available')def _generate_training_set(self):# 从反馈数据创建训练集training_data = []for item in self.feedback_queue:# 自动标注label = self._auto_label(item)training_data.append({'images': item['raw_data'],'label': label})# 保存到数据湖self._save_to_datalake(training_data)
系统优化策略
-
数据流优化:
- 使用Apache Kafka实现数据管道
- 基于时间戳的多相机帧同步
- 列式存储(Parquet)减少I/O开销
-
训练加速:
- 混合精度训练(FP16)
- 分布式数据并行
- 梯度累积支持大批次
-
边缘部署优化:
- 模型量化(FP16/INT8)
- TensorRT引擎优化
- 模型切片并行处理
-
持续优化机制:
硬件配置建议
组件 | 推荐配置 | 说明 |
---|---|---|
采集节点 | Intel i7 + 32GB RAM + 10GbE | 处理原始6路视频流 |
边缘节点 | NVIDIA Jetson AGX Orin | 每节点处理1-2路视频 |
训练集群 | 8x A100 80GB + 1TB RAM | 分布式模型训练 |
存储系统 | Ceph集群 + 100TB NVMe | 高速数据湖存储 |
网络 | 100Gb InfiniBand | 节点间高速通信 |
这套系统设计具有以下优势:
- 端到端集成:从采集到部署全流程覆盖
- 实时性:边缘处理保证低延迟
- 可扩展性:支持从单设备到大型集群
- 自动化闭环:持续优化无需人工干预
- 多相机协同:专门优化的融合架构
实际部署时,可根据具体应用场景调整相机数量、模型架构和硬件配置。对于工业检测场景,可增加红外相机;对于自动驾驶,可增加雷达数据融合模块。