# camera_processor.py import base64 from copy import deepcopy import cv2 import threading import time import asyncio import json import os from config import BIZ_CLASS from global_logger import logger, process_log_data from image_plotting import Annotator, colors, COLOR_RED class CameraProcessor(threading.Thread): def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1): super().__init__() self.camera_config = camera_config self.model_wrapper = model_wrapper self.tcp_client = tcp_client self.http_client = http_client self.loop = loop # 主线程中运行的事件循环 self.daemon = True # 主线程退出时自动关闭 # 配置参数 self.cam_id = self.camera_config.get('cam_id') self.gst_str = self.camera_config.get('gst_str') self.tcp_send_cls = self.camera_config.get('tcp_send_cls', []) self.alarm_send_cls = self.camera_config.get('alarm_send_cls', []) self.biz_cls = sum(BIZ_CLASS.values(), []) # 摄像头状态 self.cap = None self.running = True self.reconnect_interval = 1 # 重连间隔(秒) self.failure_threshold = 5 # 连续失败阈值 # 抽帧相关参数 self.frame_interval = self.camera_config.get('frame_interval', 1) # 默认每帧都处理 self.frame_count = 0 # 批处理相关参数 self.batch_size = batch_size # 默认单张处理 self.frame_buffer = [] self.frame_info_buffer = [] # 存储帧信息以便处理后能关联回原帧 self.frame_width = None self.frame_height = None self.frame_fps = None self.frames_detected = 0 self.fps_ts = None # 添加图像保存相关参数 self.save_annotated_images = self.camera_config.get('save_annotated_images', False) self.save_path = self.camera_config.get('save_path', './saved_images') self.save_interval = self.camera_config.get('save_interval', 25 * 10) # 每隔多少帧保存一次 self.save_count = 0 # 确保保存目录存在 if self.save_annotated_images: os.makedirs(self.save_path, exist_ok=True) logger.info(f"图像将保存到: {self.save_path}") # 最后捕获的帧 self.last_frame = None self.last_frame_lock = threading.Lock() self.last_biz_boxes = [] self.last_alarm_ts = None self.alarm_send_interval = self.camera_config.get('alarm_interval', 10) def _open_camera(self): """尝试打开摄像头,返回是否成功""" try: if self.cap is not None: self.cap.release() # 确保释放之前的资源 logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}") self.cap = cv2.VideoCapture(self.gst_str) # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER) if not self.cap.isOpened(): logger.error(f"摄像头 {self.cam_id} 打开失败") return False logger.info(f"摄像头 {self.cam_id} 打开成功") self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}") logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}") logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}") return True except Exception as e: logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}") return False def _reconnect_camera(self): """重新连接摄像头,直到成功""" while self.running: logger.info(f"摄像头 {self.cam_id} 正在重连...") if self._open_camera(): return True time.sleep(self.reconnect_interval) return False def _process_frame(self, frame): """处理摄像头读取到的帧""" # 使用 YOLO 模型进行检测 boxes = self.model_wrapper.predict(frame) for result in boxes: detections = result.boxes.data.cpu().numpy() # [x1, y1, x2, y2, conf, cls] for detection in detections: x1, y1, x2, y2, conf, cls = detection label = self.model_wrapper.get_label(int(cls)) # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = { "camera": self.cam_id, # 使用cam_id而不是name "label": label, "confidence": float(conf), "bbox": [float(x1), float(y1), float(x2), float(y2)] # 确保是float类型 } # 使用主线程的事件循环将 TCP 消息异步发送出去 asyncio.run_coroutine_threadsafe( self.tcp_client.send(json.dumps(data)), self.loop ) def _process_batch(self, frames, frame_infos): """批量处理多个帧""" if not frames: return # 批量推理 batch_boxes = self.model_wrapper.batch_predict(frames) # 处理每个帧的结果 for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)): frame = frames[idx] timestamp = frame_info["timestamp"] # 保存最后一帧 with self.last_frame_lock: self.last_frame = frame.copy() self.last_biz_boxes = [] # 用于保存的带标注图像 annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None frame_labels = [] for box in boxes: x1, y1, x2, y2 = box.xyxy.cpu().squeeze() cls = int(box.cls) conf = float(box.conf) label = self.model_wrapper.get_label(cls) frame_labels.append(label) # print(label) # 只处理需要通过TCP发送的目标类别 if label in self.tcp_send_cls: data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \ f"{self.frame_width},{self.frame_height},FA" # 使用主线程的事件循环将 TCP 消息异步发送出去 asyncio.run_coroutine_threadsafe( self.tcp_client.send(data), self.loop ) if label in self.biz_cls: self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf)) # 在复制的图像上绘制标注 if self.save_annotated_images: annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls))) # print(self.last_biz_boxes) # 上传第三方施工报警 if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels): if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval: continue alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']] alarm_annotator = Annotator(deepcopy(frame)) for box in alarm_boxes: alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED) success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result()) if not success: logger.error(f"摄像头 {self.cam_id} 编码图像失败") continue alarm_labels = [box[5] for box in alarm_boxes] request_data = { "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( jpg_data is not None and jpg_data.size > 0) else '', "reportType": '10006', "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工', "isAlarm": 1, } self.last_alarm_ts = time.time() logger.debug(process_log_data(request_data)) # 发送HTTP请求 asyncio.run_coroutine_threadsafe( self.http_client.send([request_data]), self.loop ) # 保存带标注的图像 if self.save_annotated_images: self.save_count += 1 if self.save_count % self.save_interval == 0: annotated_frame = annotator.result() self._save_annotated_image(annotated_frame, timestamp) def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf): """在图像上绘制标注框""" # 绘制矩形框 cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) # 添加标签和置信度 text = f"{label}: {conf:.2f}" cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) return image def _save_annotated_image(self, image, timestamp): """保存带标注的图像""" try: # 生成文件名:摄像头ID_时间戳.jpg filename = f"{self.cam_id}_{int(timestamp)}.jpg" filepath = f"{self.save_path}/{filename}" # 保存图像 cv2.imwrite(filepath, image) logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}") except Exception as e: logger.exception(f"保存图像时发生错误: {e}") def _add_to_batch(self, frame): """将帧添加到批处理缓冲区""" self.frame_buffer.append(frame) self.frame_info_buffer.append({ "timestamp": time.time() }) # 当缓冲区达到批处理大小时处理批次 if len(self.frame_buffer) >= self.batch_size: self._process_batch(self.frame_buffer, self.frame_info_buffer) self.log_fps(len(self.frame_buffer)) self.frame_buffer = [] self.frame_info_buffer = [] def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() # 每秒输出 FPS if self.fps_ts is None or current_time - self.fps_ts >= 10: fps = self.frames_detected / 10.0 self.frames_detected = 0 logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}") self.fps_ts = current_time def get_last_frame(self): """获取最后捕获的帧,如果没有则返回None Returns: numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None """ with self.last_frame_lock: if self.last_frame is None: return None, [] print(f'====================={self.last_biz_boxes}') return self.last_frame.copy(), self.last_biz_boxes.copy() def run(self): """摄像头处理主循环""" logger.info(f"摄像头处理线程 {self.cam_id} 启动") # 确保初始化成功 if not self._open_camera(): if not self._reconnect_camera(): logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出") return failure_count = 0 # 连续读取失败计数 while self.running: try: if self.cap is None or not self.cap.isOpened(): logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭") if not self._reconnect_camera(): break failure_count = 0 continue ret, frame = self.cap.read() if not ret: failure_count += 1 logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}") # 达到失败阈值,尝试重连摄像头 if failure_count >= self.failure_threshold: logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...") if not self._reconnect_camera(): break failure_count = 0 else: time.sleep(0.1) # 短暂等待后继续尝试 continue else: # 读取成功后,重置失败计数 failure_count = 0 # # 保存最后一帧 # with self.last_frame_lock: # self.last_frame = frame.copy() # 抽帧处理 self.frame_count += 1 if self.frame_count % self.frame_interval == 0: # self._process_frame(frame) self._add_to_batch(frame) except Exception as e: logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}") failure_count += 1 # 发生异常后,可能需要重置摄像头 if failure_count >= self.failure_threshold: logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...") if not self._reconnect_camera(): break failure_count = 0 time.sleep(0.5) # 异常发生后等待时间更长 # 线程结束前释放资源 if self.cap is not None: self.cap.release() logger.info(f"摄像头处理线程 {self.cam_id} 已退出")