Newer
Older
safe-algo-pro / common / display_frame_manager.py
import threading
from collections import deque


class DisplayFrameManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        """实现单例模式,确保只有一个实例"""
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:  # 防止多个线程同时创建
                    cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, maxlen=10):
        if not hasattr(self, '_initialized'):  # 防止重复初始化
            self.device_queues = {}
            self.maxlen = maxlen
            self._initialized = True

    def add_frame(self, device_id, frame):
        """向设备队列中添加帧,如果设备不存在,则自动创建队列"""
        with self._lock:
            if device_id not in self.device_queues:
                self.device_queues[device_id] = deque(maxlen=self.maxlen)
            if len(self.device_queues[device_id]) >= self.maxlen:
                self.device_queues[device_id].pop()  # 移除最旧的帧
            self.device_queues[device_id].appendleft(frame)  # 添加新帧

    def get_latest_frame(self, device_id):
        """获取设备队列中的最新帧"""
        with self._lock:
            if device_id in self.device_queues and self.device_queues[device_id]:
                return self.device_queues[device_id][0]
            return None  # 如果设备不存在或没有帧,返回 None

    def remove_device(self, device_id):
        """移除设备及其队列"""
        with self._lock:
            if device_id in self.device_queues:
                del self.device_queues[device_id]

    def has_device(self,device_id):
        return device_id in self.device_queues and self.device_queues[device_id]