Newer
Older
safe-algo-pro / tcp / tcp_manager.py
zhangyingjie on 2 Nov 3 KB 增加甲烷查询任务及接口
import asyncio
from typing import List, Dict

from algo.algo_runner_manager import get_algo_runner
from common.consts import DEVICE_TYPE, NotifyChangeType
from db.database import get_db
from entity.device import Device
from services.device_service import DeviceService


from common.global_logger import logger
from tcp.tcp_client_connector import TcpClientConnector


class TcpManager:
    def __init__(self):
        self.devices: List[Device] = []
        self.connector_map: Dict[int, TcpClientConnector] = {}

        # 从全局algo_runner中获取device_service,确保能收到设备更新通知
        algo_runner = get_algo_runner()
        self.device_service = algo_runner.device_service

        # 注册设备和模型的变化回调
        self.device_service.register_change_callback(self.on_device_change)

    async def load_and_connect_devices(self):
        """从数据库加载设备并连接所有设备"""
        devices = self.device_service.get_device_list(device_type=DEVICE_TYPE.TREE)  # 使用局部变量
        logger.info(f"get {len(devices)} tree devices")
        for device in devices:
            await self.start_device_connect(device)

    async def start(self):
        """启动设备管理器"""
        await self.load_and_connect_devices()

    async def start_device_connect(self, device: Device):
        if device and int(device.type) == DEVICE_TYPE.TREE and device.gas_ip:
            if device.id in self.connector_map:
                logger.warning(f"Device {device.id} is already connected.")
                return  # 防止重复连接
            connector = TcpClientConnector(ip=device.gas_ip, port=333)
            self.connector_map[device.id] = connector
            asyncio.create_task(connector.connect())

    async def stop_device_connect(self, device_id):
        if device_id in self.connector_map:
            connector = self.connector_map.pop(device_id)
            await self.disconnect_device(connector)

    async def restart_device_thread(self, device_id):
        await self.stop_device_connect(device_id)
        device = self.device_service.get_device(device_id)
        await self.start_device_connect(device)

    async def on_device_change(self, device_id, change_type):
        """设备变化时的回调处理 todo 线程处理待优化"""
        if change_type == NotifyChangeType.DEVICE_CREATE:
            # 新增设备,加载新设备并连接
            new_device = self.device_service.get_device(device_id)
            await self.start_device_connect(new_device)

        elif change_type == NotifyChangeType.DEVICE_DELETE:
            await self.stop_device_connect(device_id)

        elif change_type == NotifyChangeType.DEVICE_UPDATE:
            # 更新设备信息,重新连接
            await self.restart_device_thread(device_id)

    async def disconnect_device(self, connector: TcpClientConnector):
        """断开设备连接"""
        await connector.disconnect()

if __name__ == '__main__':
    tcp_manager = TcpManager()
    asyncio.run(tcp_manager.start())