Newer
Older
go-algo-server / camera_processor.py
zhangyingjie 4 days ago 13 KB 暂存修改
# camera_processor.py
import base64
from copy import deepcopy

import cv2
import threading
import time
import asyncio
import json
import os

from config import BIZ_CLASS
from global_logger import logger, process_log_data
from image_plotting import Annotator, colors, COLOR_RED


class CameraProcessor(threading.Thread):
    def __init__(self, camera_config, model_wrapper, tcp_client, http_client, loop, batch_size=1):
        super().__init__()
        self.camera_config = camera_config
        self.model_wrapper = model_wrapper
        self.tcp_client = tcp_client
        self.http_client = http_client
        self.loop = loop  # 主线程中运行的事件循环
        self.daemon = True  # 主线程退出时自动关闭

        # 配置参数
        self.cam_id = self.camera_config.get('cam_id')
        self.gst_str = self.camera_config.get('gst_str')
        self.tcp_send_cls = self.camera_config.get('tcp_send_cls', [])
        self.alarm_send_cls = self.camera_config.get('alarm_send_cls', [])
        self.biz_cls = sum(BIZ_CLASS.values(), [])

        # 摄像头状态
        self.cap = None
        self.running = True
        self.reconnect_interval = 1  # 重连间隔(秒)
        self.failure_threshold = 5  # 连续失败阈值

        # 抽帧相关参数
        self.frame_interval = self.camera_config.get('frame_interval', 1)  # 默认每帧都处理
        self.frame_count = 0

        # 批处理相关参数
        self.batch_size = batch_size  # 默认单张处理
        self.frame_buffer = []
        self.frame_info_buffer = []  # 存储帧信息以便处理后能关联回原帧

        self.frame_width = None
        self.frame_height = None
        self.frame_fps = None

        self.frames_detected = 0
        self.fps_ts = None

        # 添加图像保存相关参数
        self.save_annotated_images = self.camera_config.get('save_annotated_images', False)
        self.save_path = self.camera_config.get('save_path', './saved_images')
        self.save_interval = self.camera_config.get('save_interval', 25 * 10)  # 每隔多少帧保存一次
        self.save_count = 0

        # 确保保存目录存在
        if self.save_annotated_images:
            os.makedirs(self.save_path, exist_ok=True)
            logger.info(f"图像将保存到: {self.save_path}")

        # 最后捕获的帧
        self.last_frame = None
        self.last_frame_lock = threading.Lock()
        self.last_biz_boxes = []

        self.last_alarm_ts = None
        self.alarm_send_interval = self.camera_config.get('alarm_interval', 10)

    def _open_camera(self):
        """尝试打开摄像头,返回是否成功"""
        try:
            if self.cap is not None:
                self.cap.release()  # 确保释放之前的资源

            logger.info(f"摄像头 {self.cam_id} 正在打开,源: {self.gst_str}")
            self.cap = cv2.VideoCapture(self.gst_str)
            # self.cap = cv2.VideoCapture(self.gst_str, cv2.CAP_GSTREAMER)

            if not self.cap.isOpened():
                logger.error(f"摄像头 {self.cam_id} 打开失败")
                return False

            logger.info(f"摄像头 {self.cam_id} 打开成功")
            self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
            logger.info(f"摄像头{self.cam_id} frame_width={self.frame_width}")
            logger.info(f"摄像头{self.cam_id} frame_height={self.frame_height}")
            logger.info(f"摄像头{self.cam_id} frame_fps={self.frame_fps}")

            return True
        except Exception as e:
            logger.exception(f"摄像头 {self.cam_id} 打开过程中发生异常: {e}")
            return False

    def _reconnect_camera(self):
        """重新连接摄像头,直到成功"""
        while self.running:
            logger.info(f"摄像头 {self.cam_id} 正在重连...")
            if self._open_camera():
                return True
            time.sleep(self.reconnect_interval)
        return False

    def _process_frame(self, frame):
        """处理摄像头读取到的帧"""
        # 使用 YOLO 模型进行检测
        boxes = self.model_wrapper.predict(frame)
        for result in boxes:
            detections = result.boxes.data.cpu().numpy()  # [x1, y1, x2, y2, conf, cls]
            for detection in detections:
                x1, y1, x2, y2, conf, cls = detection
                label = self.model_wrapper.get_label(int(cls))

                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = {
                        "camera": self.cam_id,  # 使用cam_id而不是name
                        "label": label,
                        "confidence": float(conf),
                        "bbox": [float(x1), float(y1), float(x2), float(y2)]  # 确保是float类型
                    }
                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(json.dumps(data)),
                        self.loop
                    )

    def _process_batch(self, frames, frame_infos):
        """批量处理多个帧"""
        if not frames:
            return

        # 批量推理
        batch_boxes = self.model_wrapper.batch_predict(frames)

        # 处理每个帧的结果
        for idx, (boxes, frame_info) in enumerate(zip(batch_boxes, frame_infos)):
            frame = frames[idx]
            timestamp = frame_info["timestamp"]

            # 保存最后一帧
            with self.last_frame_lock:
                self.last_frame = frame.copy()
            self.last_biz_boxes = []

            # 用于保存的带标注图像
            annotator = Annotator(deepcopy(frame)) if self.save_annotated_images else None

            frame_labels = []
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy.cpu().squeeze()
                cls = int(box.cls)
                conf = float(box.conf)
                label = self.model_wrapper.get_label(cls)
                frame_labels.append(label)
                # print(label)

                # 只处理需要通过TCP发送的目标类别
                if label in self.tcp_send_cls:
                    data = f"{self.cam_id},{cls},{int(x1)},{int(y1)},{int(x2)},{int(y2)}," \
                           f"{self.frame_width},{self.frame_height},FA"

                    # 使用主线程的事件循环将 TCP 消息异步发送出去
                    asyncio.run_coroutine_threadsafe(
                        self.tcp_client.send(data),
                        self.loop
                    )

                if label in self.biz_cls:
                    self.last_biz_boxes.append((x1, y1, x2, y2, cls, label, conf))

                # 在复制的图像上绘制标注
                if self.save_annotated_images:
                    annotator.box_label(box=[x1, y1, x2, y2], label=label, color=colors(int(cls)))

            # print(self.last_biz_boxes)

            # 上传第三方施工报警
            if any(label in BIZ_CLASS['CONSTRUCTION'] for label in frame_labels):

                if self.last_alarm_ts is not None and time.time() - self.last_alarm_ts < self.alarm_send_interval:
                    continue

                alarm_boxes = [ box for box in self.last_biz_boxes if box[5] in BIZ_CLASS['CONSTRUCTION']]
                alarm_annotator = Annotator(deepcopy(frame))
                for box in alarm_boxes:
                    alarm_annotator.box_label(box=[box[0], box[1], box[2], box[3]], label=box[5], color=COLOR_RED)

                success, jpg_data = cv2.imencode('.jpg', alarm_annotator.result())
                if not success:
                    logger.error(f"摄像头 {self.cam_id} 编码图像失败")
                    continue

                alarm_labels = [box[5] for box in alarm_boxes]
                request_data = {
                    "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (
                            jpg_data is not None and jpg_data.size > 0) else '',
                    "reportType": '10006',
                    "reportContent": f'识别到{"、".join(alarm_labels)},发现第三方施工',
                    "isAlarm": 1,
                }
                self.last_alarm_ts = time.time()
                logger.debug(process_log_data(request_data))
                # 发送HTTP请求
                asyncio.run_coroutine_threadsafe(
                    self.http_client.send([request_data]),
                    self.loop
                )

            # 保存带标注的图像
            if self.save_annotated_images:
                self.save_count += 1
                if self.save_count % self.save_interval == 0:
                    annotated_frame = annotator.result()
                    self._save_annotated_image(annotated_frame, timestamp)

    def _draw_box_on_image(self, image, x1, y1, x2, y2, label, conf):
        """在图像上绘制标注框"""
        # 绘制矩形框
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # 添加标签和置信度
        text = f"{label}: {conf:.2f}"
        cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return image

    def _save_annotated_image(self, image, timestamp):
        """保存带标注的图像"""
        try:
            # 生成文件名:摄像头ID_时间戳.jpg
            filename = f"{self.cam_id}_{int(timestamp)}.jpg"
            filepath = f"{self.save_path}/{filename}"

            # 保存图像
            cv2.imwrite(filepath, image)
            logger.info(f"摄像头 {self.cam_id} 已保存标注图像: {filepath}")
        except Exception as e:
            logger.exception(f"保存图像时发生错误: {e}")

    def _add_to_batch(self, frame):
        """将帧添加到批处理缓冲区"""
        self.frame_buffer.append(frame)
        self.frame_info_buffer.append({
            "timestamp": time.time()
        })

        # 当缓冲区达到批处理大小时处理批次
        if len(self.frame_buffer) >= self.batch_size:
            self._process_batch(self.frame_buffer, self.frame_info_buffer)
            self.log_fps(len(self.frame_buffer))
            self.frame_buffer = []
            self.frame_info_buffer = []

    def log_fps(self, frame_count):
        self.frames_detected += frame_count
        current_time = time.time()
        # 每秒输出 FPS
        if self.fps_ts is None or current_time - self.fps_ts >= 10:
            fps = self.frames_detected / 10.0
            self.frames_detected = 0
            logger.info(f"FPS (detect) for cam {self.cam_id}: {fps}")
            self.fps_ts = current_time

    def get_last_frame(self):
        """获取最后捕获的帧,如果没有则返回None
        
        Returns:
            numpy.ndarray 或 None: 返回帧的副本,或者在没有可用帧时返回None
        """
        with self.last_frame_lock:
            if self.last_frame is None:
                return None, []
            print(f'====================={self.last_biz_boxes}')
            return self.last_frame.copy(), self.last_biz_boxes.copy()

    def run(self):
        """摄像头处理主循环"""
        logger.info(f"摄像头处理线程 {self.cam_id} 启动")

        # 确保初始化成功
        if not self._open_camera():
            if not self._reconnect_camera():
                logger.error(f"摄像头 {self.cam_id} 无法连接,线程退出")
                return

        failure_count = 0  # 连续读取失败计数

        while self.running:
            try:
                if self.cap is None or not self.cap.isOpened():
                    logger.warning(f"摄像头 {self.cam_id} 未打开或已关闭")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                    continue

                ret, frame = self.cap.read()

                if not ret:
                    failure_count += 1
                    logger.warning(f"摄像头 {self.cam_id} 读取帧失败,累计失败次数: {failure_count}")

                    # 达到失败阈值,尝试重连摄像头
                    if failure_count >= self.failure_threshold:
                        logger.info(f"摄像头 {self.cam_id} 读取失败次数过多,正在重连...")
                        if not self._reconnect_camera():
                            break
                        failure_count = 0
                    else:
                        time.sleep(0.1)  # 短暂等待后继续尝试
                    continue
                else:
                    # 读取成功后,重置失败计数
                    failure_count = 0

                    # # 保存最后一帧
                    # with self.last_frame_lock:
                    #     self.last_frame = frame.copy()

                    # 抽帧处理
                    self.frame_count += 1
                    if self.frame_count % self.frame_interval == 0:
                        # self._process_frame(frame)
                        self._add_to_batch(frame)

            except Exception as e:
                logger.exception(f"摄像头 {self.cam_id} 处理过程中发生异常:{e}")
                failure_count += 1

                # 发生异常后,可能需要重置摄像头
                if failure_count >= self.failure_threshold:
                    logger.info(f"摄像头 {self.cam_id} 异常次数过多,正在重连...")
                    if not self._reconnect_camera():
                        break
                    failure_count = 0
                time.sleep(0.5)  # 异常发生后等待时间更长

        # 线程结束前释放资源
        if self.cap is not None:
            self.cap.release()

        logger.info(f"摄像头处理线程 {self.cam_id} 已退出")