Newer
Older
safe-algo-pro / tcp / tcp_client_manager.py
import asyncio
from typing import List, Dict

from common.consts import DEVICE_TYPE, NotifyChangeType, TREE_COMMAND
from db.database import get_db
from entity.device import Device
from services.device_service import DeviceService

from common.global_logger import logger
from tcp.gas_device_handler import GasDataHandler
from tcp.tcp_connection import TcpConnection


# TcpClientManager 负责管理各设备的 TcpConnection,并提供对外接口
class TcpClientManager:
    def __init__(self, device_service, main_loop=None):
        self.main_loop = main_loop
        self.device_service = device_service
        self.connector_map = {}  # device_id -> TcpConnection

    async def start(self):
        """从数据库加载设备并连接所有设备"""
        devices = await self.device_service.get_device_list(device_type=DEVICE_TYPE.TREE)  # 使用局部变量
        logger.info(f"get {len(devices)} tree devices")
        for device in devices:
            await self.start_device_connect(device)

    async def start_device_connect(self, device):
        if device.id in self.connector_map:
            logger.warning(f"设备 {device.id} 已连接")
            return
        connector = TcpConnection(ip=device.gas_ip, port=333)
        asyncio.create_task(connector.connection_monitor())
        handler = GasDataHandler(main_loop=self.main_loop)
        connector.register_data_handler(handler.handle_data)
        self.connector_map[device.id] = connector
        asyncio.create_task(connector.connect())
        # 启动定时查询任务(查询间隔由设备配置决定)
        asyncio.create_task(
            connector.start_periodic_query(query_command=TREE_COMMAND.GAS_QUERY, interval=3))

    async def send_message_to_device(self, device_id: int, message: bytes, have_response: bool = True):
        if device_id not in self.connector_map:
            device = await self.device_service.get_device(device_id)
            await self.start_device_connect(device)
        connector = self.connector_map.get(device_id)
        if connector:
            await connector.send_message(message, have_response=have_response)