from algo.algo_runner import AlgoRunner from algo.scene_runner import SceneRunner from common.biz_exception import BizExceptionHandlers from common.global_logger import logger from db.database import get_db from services.device_model_relation_service import DeviceModelRelationService from services.device_scene_relation_service import DeviceSceneRelationService from services.device_service import DeviceService from services.global_config import GlobalConfig from services.model_service import ModelService from services.scene_service import SceneService import asyncio import concurrent.futures async def run(): async for db in get_db(): global_config = GlobalConfig() await global_config.init_config() device_service = DeviceService(db) model_service = ModelService(db) model_relation_service = DeviceModelRelationService(db) scene_service = SceneService(db) scene_relation_service = DeviceSceneRelationService(db) algo_runner = AlgoRunner( device_service=device_service, model_service=model_service, relation_service=model_relation_service, ) await algo_runner.start() tasks = algo_runner.device_tasks.values() while True: await asyncio.sleep(1) # 模拟主任务运行 # for t in tasks: # t.join() futures = algo_runner.task_futures.values() concurrent.futures.wait(futures, timeout=None) # 显式等待所有子任务完成 pending_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()] if pending_tasks: await asyncio.gather(*pending_tasks) # 定义主函数入口 if __name__ == "__main__": loop = asyncio.get_event_loop() try: loop.run_until_complete(run()) except (KeyboardInterrupt, SystemExit): logger.info("Shutting down gracefully...") tasks = asyncio.all_tasks(loop=loop) for task in tasks: task.cancel() loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) loop.close()