# camera_processor.py import cv2 import threading import time import asyncio import json from global_logger import logger class CameraProcessor(threading.Thread): def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): super().__init__() self.camera_config = camera_config self.model_wrapper = model_wrapper self.tcp_client = tcp_client self.http_client = http_client self.loop = loop # 主线程中运行的事件循环 self.daemon = True # 主线程退出时自动关闭 # 配置参数 self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) # 摄像头状态 self.cap = None self.running = True self.reconnect_interval = 1 # 重连间隔(秒) self.failure_threshold = 5 # 连续失败阈值 # 抽帧相关参数 self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 self.frame_count = 0 # 批处理相关参数 self.batch_size = batch_size # 默认单张处理 self.frame_buffer = [] self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 self.frame_width = None self.frame_height = None self.frame_fps = None def _open_camera(self): """尝试打开摄像头,返回是否成功""" try: if self.cap is not None: self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") self.cap = cv2.VideoCapture(self.gst_str) # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") return False logger.info(f"摄像头 {self.cam_id} 打开成功") self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") return True except Exception as e: logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") return False def _reconnect_camera(self): """重新连接摄像头,直到成功""" while self.running: logger.info(f"摄像头 {self.cam_id} 正在重连...") if self._open_camera(): return True time.sleep(self.reconnect_interval) return False def _process_frame(self, frame): """处理摄像头读取到的帧""" # 使用 YOLO 模型进行检测 boxes = self.model_wrapper.predict(frame) for result in boxes: detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] for detection in detections: x1, y1, x2, y2, conf, cls = detection label = self.model_wrapper.get_label(int(cls)) # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = { "camera": self.cam_id, # 使用cam_id而不是name "label": label, "confidence": float(conf), "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 } # 使用主线程的事件循环将 TCP 消息异步发送出去 asyncio.run_coroutine_threadsafe( self.tcp_client.send(json.dumps(data)), self.loop ) # 如果有额外HTTP处理逻辑,可在此添加 def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: return # 批量推理 batch_boxes = self.model_wrapper.batch_predict(frames) # 处理每个帧的结果 for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) # print(label) # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ f"{self.frame_width},{self.frame_height},FA" # 使用主线程的事件循环将 TCP 消息异步发送出去 asyncio.run_coroutine_threadsafe( self.tcp_client.send(data), self.loop ) def _add_to_batch(self, frame): """将帧添加到批处理缓冲区""" self.frame_buffer.append(frame) self.frame_info_buffer.append({ "timestamp": time.time() }) # 当缓冲区达到批处理大小时处理批次 if len(self.frame_buffer) >= self.batch_size: self._process_batch(self.frame_buffer, self.frame_info_buffer) self.frame_buffer = [] self.frame_info_buffer = [] def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") # 确保初始化成功 if not self._open_camera(): if not self._reconnect_camera(): logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") return failure_count = 0 # 连续读取失败计数 while self.running: try: if self.cap is None or not self.cap.isOpened(): logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") if not self._reconnect_camera(): break failure_count = 0 continue ret, frame = self.cap.read() if not ret: failure_count += 1 logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") # 达到失败阈值,尝试重连摄像头 if failure_count >= self.failure_threshold: logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") if not self._reconnect_camera(): break failure_count = 0 else: time.sleep(0.1) # 短暂等待后继续尝试 continue else: # 读取成功后,重置失败计数 failure_count = 0 # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: # self._process_frame(frame) self._add_to_batch(frame) except Exception as e: logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") failure_count += 1 # 发生异常后,可能需要重置摄像头 if failure_count >= self.failure_threshold: logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") if not self._reconnect_camera(): break failure_count = 0 time.sleep(0.5) # 异常发生后等待时间更长 # 线程结束前释放资源 if self.cap is not None: self.cap.release() logger.info(f"摄像头处理线程 {self.cam_id} 已退出")