Newer
Older
lynxi-casic-demo / http_tool.py
import requests
import threading
from typing import Dict, Any, Callable


class HttpTool:
    def __init__(self):
        """初始化HTTP工具类"""

    def post(self, url: str, data: Dict[str, Any] = None, headers: Dict[str, str] = None,
             callback: Callable[[requests.Response], None] = None):
        
        if callback is None:
            callback = self.default_callback
        """
        异步POST请求
        :param url: 请求URL
        :param data: POST的数据
        :param headers: 请求头
        :param callback: 回调函数,接收response对象
        """
        # 定义子线程的目标函数
        def _post_request():
            try:
                response = requests.post(url, json=data, headers=headers)
                if callback:
                    callback(response)
            except Exception as e:
                print(f"HTTP POST 请求失败: {e}")
                if callback:
                    callback(None)

        # 创建并启动子线程
        thread = threading.Thread(target=_post_request)
        thread.daemon = True  # 设置守护线程
        thread.start()

    def default_callback(self, response: requests.Response):
        """
        默认回调函数
        :param response: HTTP响应对象
        """
        if response is not None:
            try:
                print(f"状态码: {response.status_code}, 响应内容: {response.json()}")
            except ValueError:
                print(f"状态码: {response.status_code}, 响应内容: {response.text}")
        else:
            print("请求失败或无响应")