import threading from datetime import datetime class DeviceStatusManager: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): # 确保线程安全的单例模式 if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super(DeviceStatusManager, cls).__new__(cls) return cls._instance def __init__(self): # 初始化一次,避免重复初始化 if not hasattr(self, "device_status"): self.device_status = {} self.lock = threading.Lock() def get_status(self, device_id): """获取指定设备的在线状态""" with self.lock: return self.device_status.get(device_id, None) def set_status(self, device_id): """设置指定设备的在线状态""" with self.lock: self.device_status[device_id] = datetime.now() def is_device_online(self, device_id): ts = self.get_status(device_id) return ts is not None and (datetime.now() - ts).seconds < 60