Newer
Older
go-algo-server / camera_processor.py
# camera_processor.py
import cv2
import threading
import time
import asyncio
import json
import os

from global_logger import logger


class CameraProcessor(threading.Thread):
    def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1):
        super().__init__()
        self.camera_config = camera_config
        self.model_wrapper = model_wrapper
        self.tcp_client = tcp_client
        self.http_client = http_client
        self.loop = loop  # 主线程中运行的事件循环
        self.daemon = True  # 主线程退出时自动关闭

        # 配置参数
        self.cam_id = self.camera_config.get('cam_id')
        self.gst_str = self.camera_config.get('gst_str')
        self.tcp_send_cls = self.camera_config.get('tcp_send_cls', [])

        # 摄像头状态
        self.cap = None
        self.running = True
        self.reconnect_interval = 1  # 重连间隔(秒)
        self.failure_threshold = 5  # 连续失败阈值

        # 抽帧相关参数
        self.frame_interval = self.camera_config.get('frame_interval', 1)  # 默认每帧都处理
        self.frame_count = 0

        # 批处理相关参数
        self.batch_size = batch_size  # 默认单张处理
        self.frame_buffer = []
        self.frame_info_buffer = []  # 存储帧信息以便处理后能关联回原帧

        self.frame_width = None
        self.frame_height = None
        self.frame_fps = None

        self.frames_detected = 0
        self.fps_ts = None

        # 添加图像保存相关参数
        self.save_annotated_images = self.camera_config.get('save_annotated_images', False)
        self.save_path = self.camera_config.get('save_path', './saved_images')
        self.save_interval = self.camera_config.get('save_interval', 25 * 10)  # 每隔多少帧保存一次
        self.save_count = 0
        
        # 确保保存目录存在
        if self.save_annotated_images:
            os.makedirs(self.save_path, exist_ok=True)
            logger.info(f"图像将保存到: {self.save_path}")
        
        # 最后捕获的帧
        self.last_frame = None
        self.last_frame_lock = threading.Lock()

    def _open_camera(self):
        """尝试打开摄像头,返回是否成功"""
        try:
            if self.cap is not None:
                self.cap.release()  # 确保释放之前的资源

            logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}")
            # self.cap = cv2.VideoCapture(self.gst_str)
            self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER)

            if not self.cap.isOpened():
                logger.error(f"摄像头 {self.cam_id} 打开失败")
                return False

            logger.info(f"摄像头 {self.cam_id} 打开成功")
            self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
            logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}")
            logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}")
            logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}")

            return True
        except Exception as e:
            logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}")
            return False

    def _reconnect_camera(self):
        """重新连接摄像头,直到成功"""
        while self.running:
            logger.info(f"摄像头 {self.cam_id} 正在重连...")
            if self._open_camera():
                return True
            time.sleep(self.reconnect_interval)
        return False

    def _process_frame(self, frame):
        """处理摄像头读取到的帧"""
        # 使用 YOLO 模型进行检测
        boxes = self.model_wrapper.predict(frame)
        for result in boxes:
            detections = result.boxes.data.cpu().numpy()  # [x1, y1, x2, y2, conf, cls]
            for detection in detections:
                x1, y1, x2, y2, conf, cls = detection
                label = self.model_wrapper.get_label(int(cls))

                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = {
                        "camera": self.cam_id,  # 使用cam_id而不是name
                        "label": label,
                        "confidence": float(conf),
                        "bbox": [float(x1), float(y1), float(x2), float(y2)]  # 确保是float类型
                    }
                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(json.dumps(data)),
                        self.loop
                    )

                    # 如果有额外HTTP处理逻辑,可在此添加

    def _process_batch(self, frames, frame_infos):
        """批量处理多个帧"""
        if not frames:
            return

        # 批量推理
        batch_boxes = self.model_wrapper.batch_predict(frames)

        # 处理每个帧的结果
        for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)):
            frame = frames[idx]
            timestamp = frame_info["timestamp"]
            
            # 用于保存的带标注图像
            annotated_frame = frame.copy() if self.save_annotated_images else None
            
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy.cpu().squeeze()
                cls = int(box.cls)
                conf = float(box.conf)
                label = self.model_wrapper.get_label(cls)
                # print(label)
                
                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \
                           f"{self.frame_width},{self.frame_height},FA"

                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(data),
                        self.loop
                    )

                # 在复制的图像上绘制标注
                if self.save_annotated_images:
                    self._draw_box_on_image(annotated_frame, int(x1), int(y1), int(x2), int(y2), cls, conf)
            
            # 保存带标注的图像
            if self.save_annotated_images:
                self.save_count += 1
                if self.save_count % self.save_interval == 0:
                    self._save_annotated_image(annotated_frame, timestamp)
    
    def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf):
        """在图像上绘制标注框"""
        # 绘制矩形框
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        
        # 添加标签和置信度
        text = f"{label}: {conf:.2f}"
        cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        return image
    
    def _save_annotated_image(self, image, timestamp):
        """保存带标注的图像"""
        try:
            # 生成文件名:摄像头ID_时间戳.jpg
            filename = f"{self.cam_id}_{int(timestamp)}.jpg"
            filepath = f"{self.save_path}/{filename}"
            
            # 保存图像
            cv2.imwrite(filepath, image)
            logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}")
        except Exception as e:
            logger.exception(f"保存图像时发生错误: {e}")

    def _add_to_batch(self, frame):
        """将帧添加到批处理缓冲区"""
        self.frame_buffer.append(frame)
        self.frame_info_buffer.append({
            "timestamp": time.time()
        })

        # 当缓冲区达到批处理大小时处理批次
        if len(self.frame_buffer) >= self.batch_size:
            self._process_batch(self.frame_buffer, self.frame_info_buffer)
            self.log_fps(len(self.frame_buffer))
            self.frame_buffer = []
            self.frame_info_buffer = []


    def log_fps(self, frame_count):
        self.frames_detected += frame_count
        current_time = time.time()
        # 每秒输出 FPS
        if self.fps_ts is None or current_time - self.fps_ts >= 10:
            fps = self.frames_detected / 10.0
            self.frames_detected = 0
            logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}")
            self.fps_ts = current_time

    def get_last_frame(self):
        """获取最后捕获的帧,如果没有则返回None
        
        Returns:
            numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None
        """
        with self.last_frame_lock:
            if self.last_frame is None:
                return None
            return self.last_frame.copy()


    def run(self):
        """摄像头处理主循环"""
        logger.info(f"摄像头处理线程 {self.cam_id} 启动")

        # 确保初始化成功
        if not self._open_camera():
            if not self._reconnect_camera():
                logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出")
                return

        failure_count = 0  # 连续读取失败计数

        while self.running:
            try:
                if self.cap is None or not self.cap.isOpened():
                    logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                    continue

                ret, frame = self.cap.read()

                if not ret:
                    failure_count += 1
                    logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}")

                    # 达到失败阈值,尝试重连摄像头
                    if failure_count >= self.failure_threshold:
                        logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...")
                        if not self._reconnect_camera():
                            break
                        failure_count = 0
                    else:
                        time.sleep(0.1)  # 短暂等待后继续尝试
                    continue
                else:
                    # 读取成功后,重置失败计数
                    failure_count = 0

                    # 保存最后一帧
                    with self.last_frame_lock:
                        self.last_frame = frame.copy()

                    # 抽帧处理
                    self.frame_count += 1
                    if self.frame_count % self.frame_interval == 0:
                        # self._process_frame(frame)
                        self._add_to_batch(frame)

            except Exception as e:
                logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}")
                failure_count += 1

                # 发生异常后,可能需要重置摄像头
                if failure_count >= self.failure_threshold:
                    logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                time.sleep(0.5)  # 异常发生后等待时间更长

        # 线程结束前释放资源
        if self.cap is not None:
            self.cap.release()

        logger.info(f"摄像头处理线程 {self.cam_id} 已退出")