import asyncio import threading from multiprocessing.connection import Listener from db.database import get_db from services.device_service import DeviceService from services.global_config import GlobalConfig from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_client_manager import TcpClientManager from tcp.tcp_server import TcpServer from common.global_logger import logger async def main(): """主函数""" db_gen = get_db() db = await db_gen.__anext__() global_config = GlobalConfig() await global_config.init_config() device_service = DeviceService(db) main_loop = asyncio.get_running_loop() # tcp client 用于连接安全树 tcp_manager = TcpClientManager(device_service=device_service, main_loop=main_loop) await tcp_manager.start() # tcp server 用于接收四合一等设备数据 tcp_server = TcpServer() harmful_handler = HarmfulGasHandler(main_loop=main_loop) tcp_server.register_data_callback(harmful_handler.parse) main_loop.create_task(tcp_server.start()) # 启动 listener 的后台线程 listener_thread = threading.Thread( target=start_listener, args=(tcp_manager, main_loop), daemon=True ) listener_thread.start() await asyncio.Event().wait() # 保持运行 def start_listener(tcp_manager, loop): """后台线程:监听进程通信""" address = ('0.0.0.0', 6000) secret = b'from_algo_to_tcp' listener = Listener(address, authkey=secret) print(f"Listener started on {address}") while True: conn = listener.accept() try: while True: # msg = conn.recv() # 阻塞式 # print(f"[IPC] Received from {secret}: {msg}") # device_id = msg.split("_")[0] # tcp_msg = msg.split("_")[1] try: raw_data = conn.recv_bytes() # 接收原始字节 print(f"[IPC] Received from {secret}: {raw_data}") # 解包长度前缀 if len(raw_data) < 4: raise ValueError("数据不足4字节") text_len = int.from_bytes(raw_data[:4], byteorder='big') if len(raw_data) < 4 + text_len: raise ValueError("device_id 长度不足") text_len = int.from_bytes(raw_data[:4], byteorder='big') device_id = raw_data[4:4 + text_len].decode('utf-8') tcp_msg = raw_data[4 + text_len:] # 把任务传给事件循环中运行 asyncio.run_coroutine_threadsafe( tcp_manager.send_message_to_device(int(device_id), tcp_msg), loop ) except EOFError: print("[IPC] Connection closed by peer (EOF).") break except Exception as e: logger.exception(f"[IPC] Received from {secret} failed: {e}") conn.close() break except Exception as e: logger.exception(f"[IPC] Listener error: {e}") finally: conn.close() if __name__ == '__main__': asyncio.run(main())