import cv2 import time import numpy as np from threading import Thread, Event from common.global_logger import logger class OpenCVStreamLoad: def __init__(self, camera_url, camera_code, retry_interval=1, vid_stride=1): assert camera_url is not None and camera_url != '' self.url = camera_url self.camera_code = camera_code self.retry_interval = retry_interval self.vid_stride = vid_stride self.cap = self.get_connect() self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.frame_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) _, self.frame = self.cap.read() self.__stop_event = Event() # 增加 stop_event 作为停止线程的标志 self.thread = Thread(target=self.update, daemon=True) self.thread.start() def create_capture(self): """ 尝试创建视频流捕获对象。 """ try: cap = cv2.VideoCapture(self.url) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: logger.error(e) return None def get_connect(self): """ 尝试重新连接,直到成功。 """ cap = None while cap is None or not cap.isOpened(): logger.info(f"{self.url} try to connect...") cap = self.create_capture() if cap is None or not cap.isOpened(): logger.info(f"{self.url} connect failed, retry after {self.retry_interval} second...") time.sleep(self.retry_interval) # 等待一段时间后重试 else: logger.info(f"{self.url} connect success!") return cap def update(self): vid_n = 0 log_n = 0 while True: vid_n += 1 if vid_n % self.vid_stride == 0: try: ret, frame = self.cap.read() if not ret: logger.info("disconnect, try to reconnect...") self.cap.release() # 释放当前的捕获对象 self.cap = self.get_connect() # 尝试重新连接 self.frame = np.zeros_like(self.frame) continue # 跳过当前循环的剩余部分 else: vid_n += 1 self.frame = frame # cv2.imwrite('cv_test.jpg', frame) if log_n % 1000 == 0: logger.debug('cap success') log_n = (log_n + 1) % 250 except Exception as e: logger.error("update fail", e) if self.cap is not None: self.cap.release() self.cap = self.get_connect() # 尝试重新连接 def __iter__(self): return self def __next__(self): return self.frame def stop(self): """ 停止视频流读取线程 """ self.__stop_event.set() self.thread.join() # 确保线程已完全终止 self.cap.release()