Newer
Older
go-algo-server / camera_processor.py
zhangyingjie on 6 Mar 8 KB 初版提交(本地测试版)
# camera_processor.py
import cv2
import threading
import time
import asyncio
import json

from global_logger import logger


class CameraProcessor(threading.Thread):
    def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1):
        super().__init__()
        self.camera_config = camera_config
        self.model_wrapper = model_wrapper
        self.tcp_client = tcp_client
        self.http_client = http_client
        self.loop = loop  # 主线程中运行的事件循环
        self.daemon = True  # 主线程退出时自动关闭

        # 配置参数
        self.cam_id = self.camera_config.get('cam_id')
        self.gst_str = self.camera_config.get('gst_str')
        self.tcp_send_cls = self.camera_config.get('tcp_send_cls', [])

        # 摄像头状态
        self.cap = None
        self.running = True
        self.reconnect_interval = 1  # 重连间隔(秒)
        self.failure_threshold = 5  # 连续失败阈值

        # 抽帧相关参数
        self.frame_interval = self.camera_config.get('frame_interval', 1)  # 默认每帧都处理
        self.frame_count = 0

        # 批处理相关参数
        self.batch_size = batch_size  # 默认单张处理
        self.frame_buffer = []
        self.frame_info_buffer = []  # 存储帧信息以便处理后能关联回原帧

        self.frame_width = None
        self.frame_height = None
        self.frame_fps = None

    def _open_camera(self):
        """尝试打开摄像头,返回是否成功"""
        try:
            if self.cap is not None:
                self.cap.release()  # 确保释放之前的资源

            logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}")
            self.cap = cv2.VideoCapture(self.gst_str)
            # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER)

            if not self.cap.isOpened():
                logger.error(f"摄像头 {self.cam_id} 打开失败")
                return False

            logger.info(f"摄像头 {self.cam_id} 打开成功")
            self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
            logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}")
            logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}")
            logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}")

            return True
        except Exception as e:
            logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}")
            return False

    def _reconnect_camera(self):
        """重新连接摄像头,直到成功"""
        while self.running:
            logger.info(f"摄像头 {self.cam_id} 正在重连...")
            if self._open_camera():
                return True
            time.sleep(self.reconnect_interval)
        return False

    def _process_frame(self, frame):
        """处理摄像头读取到的帧"""
        # 使用 YOLO 模型进行检测
        boxes = self.model_wrapper.predict(frame)
        for result in boxes:
            detections = result.boxes.data.cpu().numpy()  # [x1, y1, x2, y2, conf, cls]
            for detection in detections:
                x1, y1, x2, y2, conf, cls = detection
                label = self.model_wrapper.get_label(int(cls))

                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = {
                        "camera": self.cam_id,  # 使用cam_id而不是name
                        "label": label,
                        "confidence": float(conf),
                        "bbox": [float(x1), float(y1), float(x2), float(y2)]  # 确保是float类型
                    }
                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(json.dumps(data)),
                        self.loop
                    )

                    # 如果有额外HTTP处理逻辑,可在此添加

    def _process_batch(self, frames, frame_infos):
        """批量处理多个帧"""
        if not frames:
            return

        # 批量推理
        batch_boxes = self.model_wrapper.batch_predict(frames)

        # 处理每个帧的结果
        for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)):
            frame = frames[idx]
            timestamp = frame_info["timestamp"]
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy.cpu().squeeze()
                cls = int(box.cls)
                conf = float(box.conf)

                label = self.model_wrapper.get_label(cls)
                # print(label)

                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \
                           f"{self.frame_width},{self.frame_height},FA"

                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(data),
                        self.loop
                    )

    def _add_to_batch(self, frame):
        """将帧添加到批处理缓冲区"""
        self.frame_buffer.append(frame)
        self.frame_info_buffer.append({
            "timestamp": time.time()
        })

        # 当缓冲区达到批处理大小时处理批次
        if len(self.frame_buffer) >= self.batch_size:
            self._process_batch(self.frame_buffer, self.frame_info_buffer)
            self.frame_buffer = []
            self.frame_info_buffer = []

    def run(self):
        """摄像头处理主循环"""
        logger.info(f"摄像头处理线程 {self.cam_id} 启动")

        # 确保初始化成功
        if not self._open_camera():
            if not self._reconnect_camera():
                logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出")
                return

        failure_count = 0  # 连续读取失败计数

        while self.running:
            try:
                if self.cap is None or not self.cap.isOpened():
                    logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                    continue

                ret, frame = self.cap.read()

                if not ret:
                    failure_count += 1
                    logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}")

                    # 达到失败阈值,尝试重连摄像头
                    if failure_count >= self.failure_threshold:
                        logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...")
                        if not self._reconnect_camera():
                            break
                        failure_count = 0
                    else:
                        time.sleep(0.1)  # 短暂等待后继续尝试
                    continue
                else:
                    # 读取成功后,重置失败计数
                    failure_count = 0
                    # 抽帧处理
                    self.frame_count += 1
                    if self.frame_count % self.frame_interval == 0:
                        # self._process_frame(frame)
                        self._add_to_batch(frame)

            except Exception as e:
                logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}")
                failure_count += 1

                # 发生异常后,可能需要重置摄像头
                if failure_count >= self.failure_threshold:
                    logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                time.sleep(0.5)  # 异常发生后等待时间更长

        # 线程结束前释放资源
        if self.cap is not None:
            self.cap.release()

        logger.info(f"摄像头处理线程 {self.cam_id} 已退出")