import asyncio from typing import List, Dict from common.consts import DEVICE_TYPE, NotifyChangeType from db.database import get_db from entity.device import Device from services.device_service import DeviceService from common.global_logger import logger from services.global_config import GlobalConfig from tcp.tcp_client_connector import TcpClientConnector class TcpManager: def __init__(self, device_service: DeviceService): self.devices: List[Device] = [] self.connector_map: Dict[int, TcpClientConnector] = {} self.device_service = device_service # 注册设备和模型的变化回调 # self.device_service.register_change_callback(self.on_device_change) async def load_and_connect_devices(self): """从数据库加载设备并连接所有设备""" devices = await self.device_service.get_device_list(device_type=DEVICE_TYPE.TREE) # 使用局部变量 logger.info(f"get {len(devices)} tree devices") for device in devices: await self.start_device_connect(device) async def start(self): """启动设备管理器""" await self.load_and_connect_devices() async def start_device_connect(self, device: Device): if device and int(device.type) == DEVICE_TYPE.TREE and device.gas_ip: if device.id in self.connector_map: logger.warning(f"Device {device.id} is already connected.") return # 防止重复连接 connector = TcpClientConnector(ip=device.gas_ip, port=333) self.connector_map[device.id] = connector asyncio.create_task(connector.connect()) async def stop_device_connect(self, device_id): if device_id in self.connector_map: connector = self.connector_map.pop(device_id) await self.disconnect_device(connector) async def restart_device_thread(self, device_id): await self.stop_device_connect(device_id) device = await self.device_service.get_device(device_id) await self.start_device_connect(device) async def on_device_change(self, device_id, change_type): """设备变化时的回调处理""" if change_type == NotifyChangeType.DEVICE_CREATE: # 新增设备,加载新设备并连接 new_device = await self.device_service.get_device(device_id) await self.start_device_connect(new_device) elif change_type == NotifyChangeType.DEVICE_DELETE: await self.stop_device_connect(device_id) elif change_type == NotifyChangeType.DEVICE_UPDATE: # 更新设备信息,重新连接 await self.restart_device_thread(device_id) async def disconnect_device(self, connector: TcpClientConnector): """断开设备连接""" await connector.disconnect() async def send_message_to_device(self, device_id, message: bytes, have_response): if device_id not in self.connector_map: device = await self.device_service.get_device(device_id) await self.start_device_connect(device) connector = self.connector_map[device_id] if connector: await connector.send_message(message, have_response=have_response) if __name__ == '__main__': async for db in get_db(): global_config = GlobalConfig() await global_config.init_config() device_service = DeviceService(db) tcp_manager = TcpManager(device_service) asyncio.run(tcp_manager.start())