Newer
Older
safe-algo-pro / algo / stream_loader.py
zhangyingjie on 14 Oct 3 KB 完善模型检测流程
import cv2
import time
import numpy as np
from threading import Thread, Event
from common.global_logger import logger


class OpenCVStreamLoad:
    def __init__(self, camera_url, camera_code,
                 retry_interval=1,
                 vid_stride=1):
        assert camera_url is not None and camera_url != ''
        self.url = camera_url
        self.camera_code = camera_code
        self.retry_interval = retry_interval
        self.vid_stride = vid_stride

        self.cap = self.get_connect()
        self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS))

        _, self.frame = self.cap.read()
        self.__stop_event = Event()  # 增加 stop_event 作为停止线程的标志
        self.thread = Thread(target=self.update, daemon=True)
        self.thread.start()

    def create_capture(self):
        """
        尝试创建视频流捕获对象。
        """
        try:
            cap = cv2.VideoCapture(self.url)
            # 可以在这里设置cap的一些属性,如果需要的话
            return cap
        except Exception as e:
            logger.error(e)
            return None

    def get_connect(self):
        """
        尝试重新连接,直到成功。
        """
        cap = None
        while cap is None or not cap.isOpened():
            logger.info(f"{self.url} try to connect...")
            cap = self.create_capture()
            if cap is None or not cap.isOpened():
                logger.info(f"{self.url} connect failed, retry after {self.retry_interval} second...")
                time.sleep(self.retry_interval)  # 等待一段时间后重试
            else:
                logger.info(f"{self.url} connect success!")
        return cap

    def update(self):
        vid_n = 0
        log_n = 0
        while True:
            vid_n += 1
            if vid_n % self.vid_stride == 0:
                try:
                    ret, frame = self.cap.read()
                    if not ret:
                        logger.info("disconnect, try to reconnect...")
                        self.cap.release()  # 释放当前的捕获对象
                        self.cap = self.get_connect()  # 尝试重新连接
                        self.frame = np.zeros_like(self.frame)
                        continue  # 跳过当前循环的剩余部分
                    else:
                        vid_n += 1
                        self.frame = frame
                        # cv2.imwrite('cv_test.jpg', frame)
                        if log_n % 1000 == 0:
                            logger.debug('cap success')
                        log_n = (log_n + 1) % 250
                except Exception as e:
                    logger.error("update fail", e)
                    if self.cap is not None:
                        self.cap.release()
                    self.cap = self.get_connect()  # 尝试重新连接

    def __iter__(self):
        return self

    def __next__(self):
        return self.frame

    def stop(self):
        """ 停止视频流读取线程 """
        self.__stop_event.set()
        self.thread.join()  # 确保线程已完全终止
        self.cap.release()