import uuid from concurrent.futures import ThreadPoolExecutor import threading def generate_thread_id(): """生成唯一的线程 ID""" return str(uuid.uuid4()) class GlobalThreadPool: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: # 第一次创建实例时调用父类的 __new__ 来创建实例 cls._instance = super(GlobalThreadPool, cls).__new__(cls) # 在此进行一次性的初始化,比如线程池的创建 max_workers = kwargs.get('max_workers', 10) cls._instance.executor = ThreadPoolExecutor(max_workers=max_workers) cls._instance.task_map = {} # 初始化任务映射 return cls._instance def submit_task(self, fn, *args, thread_id=None, **kwargs): """提交任务到线程池,并记录线程 ID""" if thread_id is None: thread_id = generate_thread_id() if self.check_task_is_running(thread_id): raise ValueError(f"线程 ID {thread_id} 已存在") future = self.executor.submit(fn, *args, **kwargs) self.task_map[thread_id] = future # 记录线程 ID 和 Future 对象的映射 return thread_id def check_task_is_running(self, thread_id): future = self.task_map.get(thread_id) if future: if future.running(): return True else: del self.task_map[thread_id] return False else: return False def stop_task(self, thread_id): """todo [可能不生效,需要控制线程任务里的标志位让它停止] 停止指定线程 ID 的任务""" future = self.task_map.get(thread_id) if future: future.cancel() # 尝试取消任务 print(f"任务 {thread_id} 已取消") del self.task_map[thread_id] # 从任务映射中删除 else: print(f"未找到线程 ID {thread_id}") def shutdown(self, wait=True): """关闭线程池""" self.executor.shutdown(wait=wait) GlobalThreadPool._instance = None