import uuid from concurrent.futures import ThreadPoolExecutor import threading from common.global_logger import logger def generate_thread_id(): """生成唯一的线程 ID""" return str(uuid.uuid4()) class GlobalThreadPool: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: # 第一次创建实例时调用父类的 __new__ 来创建实例 cls._instance = super(GlobalThreadPool, cls).__new__(cls) # 在此进行一次性的初始化,比如线程池的创建 max_workers = kwargs.get('max_workers', 10) cls._instance.executor = ThreadPoolExecutor(max_workers=max_workers) cls._instance.task_map = {} # 初始化任务映射 return cls._instance def submit_task(self, fn, *args, thread_id=None, **kwargs): """提交任务到线程池,并记录线程 ID""" if thread_id is None: thread_id = generate_thread_id() if self.check_task_is_running(thread_id): raise ValueError(f"线程 ID {thread_id} 已存在") future = self.executor.submit(fn, *args, **kwargs) self.task_map[thread_id] = future # 记录线程 ID 和 Future 对象的映射 return thread_id def check_task_is_running(self, thread_id): future = self.task_map.get(thread_id) if future: if future.running(): return True else: del self.task_map[thread_id] return False else: return False def check_task_stopped(self, thread_id): """判断任务是否已停止""" future = self.task_map.get(thread_id) if future: if future.done(): try: # 确保任务是正常完成的,而不是因为异常停止 future.result() # 如果任务抛出异常,这里会捕获 logger.info(f"Task {thread_id} has stopped successfully.") except Exception as e: logger.error(f"Task {thread_id} encountered an error: {e}") return True # 无论成功还是失败,任务已停止 else: return False # 任务仍在运行 else: logger.warning(f"No task found with thread ID {thread_id}.") return True # 如果找不到该任务,认为它已经停止(或者不存在) def stop_task(self, thread_id): """todo [可能不生效,需要控制线程任务里的标志位让它停止] 停止指定线程 ID 的任务""" future = self.task_map.get(thread_id) if future: future.cancel() # 尝试取消任务 logger.info(f"任务 {thread_id} 已取消") del self.task_map[thread_id] # 从任务映射中删除 else: logger.info(f"未找到线程 ID {thread_id}") def shutdown(self, wait=True): """关闭线程池""" self.executor.shutdown(wait=wait) GlobalThreadPool._instance = None