Newer
Older
safe-algo-pro / tcp / tcp_manager.py
import asyncio
from typing import List, Dict


from common.consts import DEVICE_TYPE, NotifyChangeType
from db.database import get_db
from entity.device import Device
from services.device_service import DeviceService

from common.global_logger import logger
from services.global_config import GlobalConfig
from tcp.tcp_client_connector import TcpClientConnector


class TcpManager:
    def __init__(self, device_service: DeviceService):
        self.devices: List[Device] = []
        self.connector_map: Dict[int, TcpClientConnector] = {}

        self.device_service = device_service

        # 注册设备和模型的变化回调
        # self.device_service.register_change_callback(self.on_device_change)

    async def load_and_connect_devices(self):
        """从数据库加载设备并连接所有设备"""
        devices = await self.device_service.get_device_list(device_type=DEVICE_TYPE.TREE)  # 使用局部变量
        logger.info(f"get {len(devices)} tree devices")
        for device in devices:
            await self.start_device_connect(device)

    async def start(self):
        """启动设备管理器"""
        await self.load_and_connect_devices()

    async def start_device_connect(self, device: Device):
        if device and int(device.type) == DEVICE_TYPE.TREE and device.gas_ip:
            if device.id in self.connector_map:
                logger.warning(f"Device {device.id} is already connected.")
                return  # 防止重复连接
            connector = TcpClientConnector(ip=device.gas_ip, port=333)
            self.connector_map[device.id] = connector
            asyncio.create_task(connector.connect())

    async def stop_device_connect(self, device_id):
        if device_id in self.connector_map:
            connector = self.connector_map.pop(device_id)
            await self.disconnect_device(connector)

    async def restart_device_thread(self, device_id):
        await self.stop_device_connect(device_id)
        device = await self.device_service.get_device(device_id)
        await self.start_device_connect(device)

    async def on_device_change(self, device_id, change_type):
        """设备变化时的回调处理"""
        if change_type == NotifyChangeType.DEVICE_CREATE:
            # 新增设备,加载新设备并连接
            new_device = await self.device_service.get_device(device_id)
            await self.start_device_connect(new_device)

        elif change_type == NotifyChangeType.DEVICE_DELETE:
            await self.stop_device_connect(device_id)

        elif change_type == NotifyChangeType.DEVICE_UPDATE:
            # 更新设备信息,重新连接
            await self.restart_device_thread(device_id)

    async def disconnect_device(self, connector: TcpClientConnector):
        """断开设备连接"""
        await connector.disconnect()

    async def send_message_to_device(self, device_id, message: bytes, have_response):
        if device_id not in self.connector_map:
            device = await self.device_service.get_device(device_id)
            await self.start_device_connect(device)
        connector = self.connector_map[device_id]
        if connector:
            await connector.send_message(message, have_response=have_response)



if __name__ == '__main__':
    async for db in get_db():
        global_config = GlobalConfig()
        await global_config.init_config()
        device_service = DeviceService(db)
        tcp_manager = TcpManager(device_service)
        asyncio.run(tcp_manager.start())