Newer
Older
lynxi-casic-demo / http_tool.py
zhangyingjie on 24 Jan 8 KB 增加后台接口调用
import requests
import threading
from typing import Dict, Any, Callable
import time

from global_logger import logger
from common import get_box_sn
from rsa_utils import encrypt_message_with_public_key
from constants import LOGIN_URI, SERVER_BASE_URL, SERVER_PUBLIC_KEY
import json
import copy


# 限制打印的长度
def truncate_data(data, max_length=500):
    data_copy = copy.deepcopy(data)
    if isinstance(data_copy, dict):
        return {k: truncate_data(v, max_length) for k, v in data_copy.items()}
    elif isinstance(data_copy, list):
        return [truncate_data(item, max_length) for item in data_copy]
    elif isinstance(data_copy, str) and len(data_copy) > max_length:
        return data_copy[:max_length] + "...(truncated)"
    else:
        return data_copy

class HttpTool:

    def __init__(self, box_sn=None):
        """初始化HTTP工具类"""
        self.box_sn = box_sn
        self.server_token = None
        self.token_ts = None
        self.token_valid_period = 24 * 60 * 60

        self._fetching_token = False
        self._token_lock = threading.Lock()

    def _is_token_valid(self) -> bool:
        """简单判断当前 token 是否还在有效期内"""
        if self.server_token is None or self.token_ts is None:
            return False
        return (time.time() - self.token_ts) < self.token_valid_period
    
    def _get_token_params(self):
        box_sn = self.box_sn or get_box_sn()
        msg = f'{box_sn}_{int(time.time() * 1000)}'
        logger.debug(f'token msg = {msg}')
        secret = encrypt_message_with_public_key(SERVER_PUBLIC_KEY, msg)
        url = f'{SERVER_BASE_URL}{LOGIN_URI}'
        params = {'secret': secret, 'code':box_sn}
        return url, params

    # ---------------------------------------------------------
    # 1) 非阻塞的 fetch_token,完成后通过回调通知结果
    # ---------------------------------------------------------
    def fetch_token_async(
        self,
        success_cb: Callable[[str], None],
        fail_cb: Callable[[Exception], None] = None
    ):
        """
        非阻塞获取 token:在子线程中拉取新的 token,
        获取成功后调用 success_cb(token_str);失败则调用 fail_cb(exception)。
        """
        def token_callback(response: requests.Response):
            res = response.json()
            if res.get('code') == 200:
                new_token = res['data']
                with self._token_lock:
                    self.server_token = new_token
                    self.token_ts = time.time()
                # 成功回调
                if success_cb:
                    success_cb(new_token)
            else:
                # 失败
                if fail_cb:
                    fail_cb(Exception(f"Fetch token failed, response={res}"))


        url, params = self._get_token_params()
        t = threading.Thread(target=self.post, args=(url,params,None,None,False,token_callback), daemon=True)
        t.start()

    # ---------------------------------------------------------
    # 2) 真正发起POST请求的函数(子线程),完成后回调
    # ---------------------------------------------------------




    def _do_post_request(
        self,
        url: str,
        data: Dict[str, Any],
        params: Dict[str, str],
        headers: Dict[str, str],
        callback: Callable[[requests.Response], None]
    ):
        """
        在子线程中发起 HTTP POST 请求
        """
        try:
            logger.info(f'POST: url={url} params={params} headers={headers} data={truncate_data(data)}')
            response = requests.post(url, json=data, headers=headers, params=params)
            if callback:
                callback(response)
        except Exception as e:
            logger.exception(f"HTTP POST 请求失败: {e}")
            if callback:
                callback(None)

    # ---------------------------------------------------------
    # 3) post:若 token 无效,先 fetch_token_async,然后在其成功回调里再发请求
    # ---------------------------------------------------------
    def post(self,
             url: str,
             data: Dict[str, Any] = None,
             params: Dict[str, str] = None,
             headers: Dict[str, str] = None,
             need_token: bool = True,
             callback: Callable[[requests.Response], None] = None):
        """
        先检查 token,如果无效,就先异步获取 token,获取成功后再发起 HTTP 请求;
        获取/发送都在子线程,不会阻塞调用此 post 的主线程。
        """
        if callback is None:
            callback = self.default_callback

        if not need_token:
            # 不需要 token,直接发请求(异步)
            thread = threading.Thread(
                target=self._do_post_request,
                args=(url, data or {},params or {},headers or {},callback),
                daemon=True
            )
            thread.start()
            return

        # 需要 token 的情况
        # 1) 如果 token 有效,直接发请求
        if self._is_token_valid():
            # 构造带token的headers
            merged_headers = (headers or {}).copy()
            merged_headers['Authorization'] = self.server_token

            thread = threading.Thread(
                target=self._do_post_request,
                args=(url, data or {},params or {}, merged_headers, callback),
                daemon=True
            )
            thread.start()
        else:
            # 2) token 无效,先异步获取 token,然后在成功回调里再发起请求
            def on_fetch_success(new_token: str):
                # 再发起请求
                merged_headers = (headers or {}).copy()
                merged_headers['Authorization'] = new_token
                thread = threading.Thread(
                    target=self._do_post_request,
                    args=(url, data or {}, params  or {}, merged_headers, callback),
                    daemon=True
                )
                thread.start()

            def on_fetch_fail(e: Exception):
                logger.error(f"Token 获取失败: {e}")
                # 根据业务需求,可以在这里 callback(None) 或者返回错误等
                if callback:
                    callback(None)

            self.fetch_token_async(
                success_cb=on_fetch_success,
                fail_cb=on_fetch_fail
            )


    def get(self,
            url: str,
            params: Dict[str, str] = None,
            headers: Dict[str, str] = None,
            need_token: bool = True) -> requests.Response:
        """
        Synchronous GET request with optional token handling.
        """
        merged_headers = (headers or {}).copy()
        if need_token:
            if not self._is_token_valid():
                token_url, token_params = self._get_token_params()
                logger.info(f'POST: url={token_url} json={token_params}')
                token_response = requests.post(url=token_url, json=token_params)
                if token_response is None:
                    logger.error('获取token失败')
                    return None
                res = token_response.json()
                logger.debug(res)
                if res.get('code') == 200:
                    new_token = res['data']
                    with self._token_lock:
                        self.server_token = new_token
                        self.token_ts = time.time()
            merged_headers['Authorization'] = self.server_token
        logger.info(f'GET: {url} {params} {merged_headers}')
        response =  requests.get(url, params=params, headers=merged_headers)
        if response is not None:
            return response.json()
        else:
            logger.error("请求失败或无响应")

    def default_callback(self, response: requests.Response):
        """
        默认回调函数
        """
        if response is not None:
            try:
                logger.info(f"状态码: {response.status_code}, 响应内容: {response.json()}")
            except ValueError:
                logger.error(f"状态码: {response.status_code}, 响应内容: {response.text}")
        else:
            logger.error("请求失败或无响应")


if __name__ == "__main__":
    tool = HttpTool()
    url = 'http://192.168.83.33:9001/test'
    data = {
        'a': '1'
    }
    # tool.post(url=url,data=data,need_token=True)
    
    url = 'http://192.168.83.33:9001/list'
    r = tool.get(url,need_token=True)
    print(r)
    time.sleep(5)