# main.py import asyncio from config import CAMERAS, TCP_SERVER, HTTP_SERVER, MODEL from tcp_client import AsyncTCPClient from http_client import AsyncHTTPClient from model_wrapper import ModelWrapper from camera_processor import CameraProcessor from handle_tcp_command import HandelTCPCommand from global_logger import logger async def main(): logger.info("开始启动算法分析服务") cap_camera_processors = {} loop = asyncio.get_running_loop() # 获取当前主线程的事件循环 tcp_client = AsyncTCPClient(TCP_SERVER["host"], TCP_SERVER["port"]) await tcp_client.connect() cap_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) alarm_upload_http_client = AsyncHTTPClient(HTTP_SERVER["url"], HTTP_SERVER["timeout"]) # 启动 TCP 和 HTTP 的发送任务 asyncio.create_task(tcp_client.send_loop()) asyncio.create_task(tcp_client.receive_loop()) asyncio.create_task(cap_upload_http_client.send_loop()) asyncio.create_task(alarm_upload_http_client.send_loop()) model_wrapper = ModelWrapper(MODEL["path"], MODEL["size"], MODEL["class_map"], MODEL["batch_size"]) # 为每个摄像头启动一个处理线程,并传入事件循环 for camera_config in CAMERAS: camera_thread = CameraProcessor(camera_config, model_wrapper, tcp_client, alarm_upload_http_client, loop, MODEL["batch_size"]) camera_thread.start() if camera_config.get("receive_capture_command", False): cam_id = camera_config.get('cam_id') cap_camera_processors[cam_id] = camera_thread # 创建TCP命令处理器并注册回调,直接传入http_client handler = HandelTCPCommand(cap_camera_processors, cap_upload_http_client) tcp_client.register_command_handler(handler.handle_capture_command) while True: await asyncio.sleep(1) if __name__ == '__main__': asyncio.run(main())