import requests import threading from typing import Dict, Any, Callable class HttpTool: def __init__(self): """初始化HTTP工具类""" def post(self, url: str, data: Dict[str, Any] = None, headers: Dict[str, str] = None, callback: Callable[[requests.Response], None] = None): if callback is None: callback = self.default_callback """ 异步POST请求 :param url: 请求URL :param data: POST的数据 :param headers: 请求头 :param callback: 回调函数,接收response对象 """ # 定义子线程的目标函数 def _post_request(): try: response = requests.post(url, json=data, headers=headers) if callback: callback(response) except Exception as e: print(f"HTTP POST 请求失败: {e}") if callback: callback(None) # 创建并启动子线程 thread = threading.Thread(target=_post_request) thread.daemon = True # 设置守护线程 thread.start() def default_callback(self, response: requests.Response): """ 默认回调函数 :param response: HTTP响应对象 """ if response is not None: try: print(f"状态码: {response.status_code}, 响应内容: {response.json()}") except ValueError: print(f"状态码: {response.status_code}, 响应内容: {response.text}") else: print("请求失败或无响应")