import asyncio from typing import List, Dict from common.consts import DEVICE_TYPE, NotifyChangeType, TREE_COMMAND from db.database import get_db from entity.device import Device from services.device_service import DeviceService from common.global_logger import logger from tcp.gas_device_handler import GasDataHandler from tcp.tcp_connection import TcpConnection # TcpClientManager 负责管理各设备的 TcpConnection,并提供对外接口 class TcpClientManager: def __init__(self, device_service, main_loop=None): self.main_loop = main_loop self.device_service = device_service self.connector_map = {} # device_id -> TcpConnection async def start(self): """从数据库加载设备并连接所有设备""" devices = await self.device_service.get_device_list(device_type=DEVICE_TYPE.TREE) # 使用局部变量 logger.info(f"get {len(devices)} tree devices") for device in devices: await self.start_device_connect(device) async def start_device_connect(self, device): if device.id in self.connector_map: logger.warning(f"设备 {device.id} 已连接") return connector = TcpConnection(ip=device.gas_ip, port=333) asyncio.create_task(connector.connection_monitor()) handler = GasDataHandler(main_loop=self.main_loop) connector.register_data_handler(handler.handle_data) self.connector_map[device.id] = connector asyncio.create_task(connector.connect()) # 启动定时查询任务(查询间隔由设备配置决定) asyncio.create_task( connector.start_periodic_query(query_command=TREE_COMMAND.GAS_QUERY, interval=3)) async def send_message_to_device(self, device_id: int, message: bytes, have_response: bool = True): if device_id not in self.connector_map: device = await self.device_service.get_device(device_id) await self.start_device_connect(device) connector = self.connector_map.get(device_id) if connector: await connector.send_message(message, have_response=have_response)