import requests import threading from typing import Dict, Any, Callable import time from global_logger import logger from common import get_box_sn from rsa_utils import encrypt_message_with_public_key from constants import LOGIN_URI, SERVER_BASE_URL, SERVER_PUBLIC_KEY import json import copy # 限制打印的长度 def truncate_data(data, max_length=500): data_copy = copy.deepcopy(data) if isinstance(data_copy, dict): return {k: truncate_data(v, max_length) for k, v in data_copy.items()} elif isinstance(data_copy, list): return [truncate_data(item, max_length) for item in data_copy] elif isinstance(data_copy, str) and len(data_copy) > max_length: return data_copy[:max_length] + "...(truncated)" else: return data_copy class HttpTool: def __init__(self, box_sn=None): """初始化HTTP工具类""" self.box_sn = box_sn self.server_token = None self.token_ts = None self.token_valid_period = 24 * 60 * 60 self._fetching_token = False self._token_lock = threading.Lock() def _is_token_valid(self) -> bool: """简单判断当前 token 是否还在有效期内""" if self.server_token is None or self.token_ts is None: return False return (time.time() - self.token_ts) < self.token_valid_period def _get_token_params(self): box_sn = self.box_sn or get_box_sn() msg = f'{box_sn}_{int(time.time() * 1000)}' logger.debug(f'token msg = {msg}') secret = encrypt_message_with_public_key(SERVER_PUBLIC_KEY, msg) url = f'{SERVER_BASE_URL}{LOGIN_URI}' params = {'secret': secret, 'code':box_sn} return url, params # --------------------------------------------------------- # 1) 非阻塞的 fetch_token,完成后通过回调通知结果 # --------------------------------------------------------- def fetch_token_async( self, success_cb: Callable[[str], None], fail_cb: Callable[[Exception], None] = None ): """ 非阻塞获取 token:在子线程中拉取新的 token, 获取成功后调用 success_cb(token_str);失败则调用 fail_cb(exception)。 """ def token_callback(response: requests.Response): res = response.json() if res.get('code') == 200: new_token = res['data'] with self._token_lock: self.server_token = new_token self.token_ts = time.time() # 成功回调 if success_cb: success_cb(new_token) else: # 失败 if fail_cb: fail_cb(Exception(f"Fetch token failed, response={res}")) url, params = self._get_token_params() t = threading.Thread(target=self.post, args=(url,params,None,None,False,token_callback), daemon=True) t.start() # --------------------------------------------------------- # 2) 真正发起POST请求的函数(子线程),完成后回调 # --------------------------------------------------------- def _do_post_request( self, url: str, data: Dict[str, Any], params: Dict[str, str], headers: Dict[str, str], callback: Callable[[requests.Response], None] ): """ 在子线程中发起 HTTP POST 请求 """ try: logger.info(f'POST: url={url} params={params} headers={headers} data={truncate_data(data)}') response = requests.post(url, json=data, headers=headers, params=params) if callback: callback(response) except Exception as e: logger.exception(f"HTTP POST 请求失败: {e}") if callback: callback(None) # --------------------------------------------------------- # 3) post:若 token 无效,先 fetch_token_async,然后在其成功回调里再发请求 # --------------------------------------------------------- def post(self, url: str, data: Dict[str, Any] = None, params: Dict[str, str] = None, headers: Dict[str, str] = None, need_token: bool = True, callback: Callable[[requests.Response], None] = None): """ 先检查 token,如果无效,就先异步获取 token,获取成功后再发起 HTTP 请求; 获取/发送都在子线程,不会阻塞调用此 post 的主线程。 """ if callback is None: callback = self.default_callback if not need_token: # 不需要 token,直接发请求(异步) thread = threading.Thread( target=self._do_post_request, args=(url, data or {},params or {},headers or {},callback), daemon=True ) thread.start() return # 需要 token 的情况 # 1) 如果 token 有效,直接发请求 if self._is_token_valid(): # 构造带token的headers merged_headers = (headers or {}).copy() merged_headers['Authorization'] = self.server_token thread = threading.Thread( target=self._do_post_request, args=(url, data or {},params or {}, merged_headers, callback), daemon=True ) thread.start() else: # 2) token 无效,先异步获取 token,然后在成功回调里再发起请求 def on_fetch_success(new_token: str): # 再发起请求 merged_headers = (headers or {}).copy() merged_headers['Authorization'] = new_token thread = threading.Thread( target=self._do_post_request, args=(url, data or {}, params or {}, merged_headers, callback), daemon=True ) thread.start() def on_fetch_fail(e: Exception): logger.error(f"Token 获取失败: {e}") # 根据业务需求,可以在这里 callback(None) 或者返回错误等 if callback: callback(None) self.fetch_token_async( success_cb=on_fetch_success, fail_cb=on_fetch_fail ) def get(self, url: str, params: Dict[str, str] = None, headers: Dict[str, str] = None, need_token: bool = True) -> requests.Response: """ Synchronous GET request with optional token handling. """ merged_headers = (headers or {}).copy() if need_token: if not self._is_token_valid(): token_url, token_params = self._get_token_params() logger.info(f'POST: url={token_url} json={token_params}') token_response = requests.post(url=token_url, json=token_params) if token_response is None: logger.error('获取token失败') return None res = token_response.json() logger.debug(res) if res.get('code') == 200: new_token = res['data'] with self._token_lock: self.server_token = new_token self.token_ts = time.time() merged_headers['Authorization'] = self.server_token logger.info(f'GET: {url} {params} {merged_headers}') response = requests.get(url, params=params, headers=merged_headers) if response is not None: return response.json() else: logger.error("请求失败或无响应") def default_callback(self, response: requests.Response): """ 默认回调函数 """ if response is not None: try: logger.info(f"状态码: {response.status_code}, 响应内容: {response.json()}") except ValueError: logger.error(f"状态码: {response.status_code}, 响应内容: {response.text}") else: logger.error("请求失败或无响应") if __name__ == "__main__": tool = HttpTool() url = 'http://192.168.83.33:9001/test' data = { 'a': '1' } # tool.post(url=url,data=data,need_token=True) url = 'http://192.168.83.33:9001/list' r = tool.get(url,need_token=True) print(r) time.sleep(5)