Newer
Older
safe-algo-pro / common / global_thread_pool.py
zhangyingjie on 12 Oct 2 KB first commit
import uuid
from concurrent.futures import ThreadPoolExecutor
import threading


def generate_thread_id():
    """生成唯一的线程 ID"""
    return str(uuid.uuid4())


class GlobalThreadPool:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                # 第一次创建实例时调用父类的 __new__ 来创建实例
                cls._instance = super(GlobalThreadPool, cls).__new__(cls)
                # 在此进行一次性的初始化,比如线程池的创建
                max_workers = kwargs.get('max_workers', 10)
                cls._instance.executor = ThreadPoolExecutor(max_workers=max_workers)
                cls._instance.task_map = {}  # 初始化任务映射
            return cls._instance

    def submit_task(self, fn, *args, thread_id=None, **kwargs):
        """提交任务到线程池,并记录线程 ID"""
        if thread_id is None:
            thread_id = generate_thread_id()
        if self.check_task_is_running(thread_id):
            raise ValueError(f"线程 ID {thread_id} 已存在")
        future = self.executor.submit(fn, *args, **kwargs)
        self.task_map[thread_id] = future  # 记录线程 ID 和 Future 对象的映射
        return thread_id

    def check_task_is_running(self, thread_id):
        future = self.task_map.get(thread_id)
        if future:
            if future.running():
                return True
            else:
                del self.task_map[thread_id]
                return False
        else:
            return False

    def stop_task(self, thread_id):
        """todo [可能不生效,需要控制线程任务里的标志位让它停止] 停止指定线程 ID 的任务"""
        future = self.task_map.get(thread_id)
        if future:
            future.cancel()  # 尝试取消任务
            print(f"任务 {thread_id} 已取消")
            del self.task_map[thread_id]  # 从任务映射中删除
        else:
            print(f"未找到线程 ID {thread_id}")

    def shutdown(self, wait=True):
        """关闭线程池"""
        self.executor.shutdown(wait=wait)
        GlobalThreadPool._instance = None