Newer
Older
safe-algo-pro / tcp_main.py
zhangyingjie 24 days ago 3 KB 服务拆分(过程版本)
import asyncio
import threading
from multiprocessing.connection import Listener

from db.database import get_db
from services.device_service import DeviceService
from services.global_config import GlobalConfig
from tcp.harmful_device_handler import HarmfulGasHandler
from tcp.tcp_client_manager import TcpClientManager
from tcp.tcp_server import TcpServer

from common.global_logger import logger


async def main():
    """主函数"""
    db_gen = get_db()
    db = await db_gen.__anext__()

    global_config = GlobalConfig()
    await global_config.init_config()

    device_service = DeviceService(db)
    main_loop = asyncio.get_running_loop()

    # tcp client 用于连接安全树
    tcp_manager = TcpClientManager(device_service=device_service, main_loop=main_loop)
    await tcp_manager.start()

    # tcp server 用于接收四合一等设备数据
    tcp_server = TcpServer()
    harmful_handler = HarmfulGasHandler(main_loop=main_loop)
    tcp_server.register_data_callback(harmful_handler.parse)
    main_loop.create_task(tcp_server.start())

    # 启动 listener 的后台线程
    listener_thread = threading.Thread(
        target=start_listener,
        args=(tcp_manager, main_loop),
        daemon=True
    )
    listener_thread.start()

    await asyncio.Event().wait()  # 保持运行


def start_listener(tcp_manager, loop):
    """后台线程:监听进程通信"""
    address = ('0.0.0.0', 6000)
    secret = b'from_algo_to_tcp'
    listener = Listener(address, authkey=secret)

    print(f"Listener started on {address}")

    while True:
        conn = listener.accept()
        try:
            while True:
                # msg = conn.recv()  # 阻塞式
                # print(f"[IPC] Received from {secret}: {msg}")
                # device_id = msg.split("_")[0]
                # tcp_msg = msg.split("_")[1]

                try:
                    raw_data = conn.recv_bytes()  # 接收原始字节
                    print(f"[IPC] Received from {secret}: {raw_data}")

                    # 解包长度前缀
                    if len(raw_data) < 4:
                        raise ValueError("数据不足4字节")

                    text_len = int.from_bytes(raw_data[:4], byteorder='big')
                    if len(raw_data) < 4 + text_len:
                        raise ValueError("device_id 长度不足")

                    text_len = int.from_bytes(raw_data[:4], byteorder='big')
                    device_id = raw_data[4:4 + text_len].decode('utf-8')
                    tcp_msg = raw_data[4 + text_len:]

                    # 把任务传给事件循环中运行
                    asyncio.run_coroutine_threadsafe(
                        tcp_manager.send_message_to_device(int(device_id), tcp_msg), loop
                    )
                except EOFError:
                    print("[IPC] Connection closed by peer (EOF).")
                    break

                except Exception as e:
                    logger.exception(f"[IPC] Received from {secret} failed: {e}")
                    conn.close()
                    break

        except Exception as e:
            logger.exception(f"[IPC] Listener error: {e}")
        finally:
            conn.close()


if __name__ == '__main__':
    asyncio.run(main())