Newer
Older
safe-algo-pro / common / device_status_manager.py
zhangyingjie on 4 Mar 1 KB 部署版本
import threading
from datetime import datetime


class DeviceStatusManager:

    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        # 确保线程安全的单例模式
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super(DeviceStatusManager, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        # 初始化一次,避免重复初始化
        if not hasattr(self, "device_status"):
            self.device_status = {}
            self.lock = threading.Lock()

    def get_status(self, device_id):
        """获取指定设备的在线状态"""
        with self.lock:
            return self.device_status.get(device_id, None)

    def set_status(self, device_id):
        """设置指定设备的在线状态"""
        with self.lock:
            self.device_status[device_id] = datetime.now()

    def is_device_online(self, device_id):
        ts = self.get_status(device_id)
        return ts is not None and (datetime.now() - ts).seconds < 60