Newer
Older
safe-algo-pro / common / harmful_gas_manager.py
import threading
from datetime import datetime


class HarmfulGasManager:

    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        # 确保线程安全的单例模式
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super(HarmfulGasManager, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        # 初始化一次,避免重复初始化
        if not hasattr(self, "device_data"):
            self.device_data = {}
            self.lock = threading.Lock()

    def get_device_all_data(self, device_code):
        """获取指定设备的在线状态"""
        with self.lock:
            return self.device_data.get(device_code, None)

    def get_device_data(self, device_code, gas_type):
        """获取指定设备的在线状态"""
        with self.lock:
            device_data = self.device_data.get(device_code, None)
            if device_data:
                return device_data.get(gas_type, None)
            return None

    def set_device_data(self, device_code, gas_type, gas_data):
        """设置指定设备的在线状态"""
        with self.lock:
            device_data = self.device_data.get(device_code, None)
            if not device_data:
                device_data = {}
                self.device_data[device_code] = device_data
            device_data[gas_type] = gas_data