class BaseInfo: def from_dict(self, data: dict): """递归动态初始化对象""" for key, value in data.items(): if hasattr(self, key): attr = getattr(self, key) # 如果是嵌套对象,递归调用 from_dict if isinstance(attr, BaseInfo): attr.from_dict(value) # 如果是列表并且需要初始化子对象 elif isinstance(attr, list) and value and isinstance(value[0], dict): obj_type = self._get_list_type(key) setattr(self, key, [obj_type().from_dict(item) for item in value]) else: setattr(self, key, value) return self def _get_list_type(self, key): """动态获取列表中子对象的类型""" raise NotImplementedError("必须在子类中实现 _get_list_type 方法") def __str__(self): """打印对象的所有属性和值""" attributes = vars(self) return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})" class DeviceInfo(BaseInfo): def __init__(self) -> None: self.device_id = 0 self.device_no = 0 self.device_type = '' self.input_url = '' self.output_url = '' self.recognition_interval = -1 self.alarm_interval = -1 self.mode_type = -1 class ModelObject(BaseInfo): def __init__(self) -> None: self.model_code = '' self.object_code = '' self.object_name = '' self.conf_threshold = 0.5 self.alarm_threshold = 0 self.range = None class ModelInfo(BaseInfo): def __init__(self) -> None: self.model_code = '' self.model_name = '' self.model_path = '' self.handle_task = '' self.objects = [] def _get_list_type(self, key): if key == "objects": return ModelObject INFO_SOURCE = 'mock' # server HTTP_SERVER = 'http://192.168.83.42:6909' import json import requests import subprocess from http_tool import HttpTool def get_box_sn(): command = "mcutools -s" try: # 执行命令 result = subprocess.run( command, shell=True, # 使用 shell 执行命令 capture_output=True, # 捕获标准输出和错误 text=True # 将输出作为文本而不是字节 ) # 检查返回码 if result.returncode == 0: return result.stdout.strip() else: print(f' "error": {result.stderr.strip()}, "returncode": {result.returncode}') return '' except Exception as e: print(e) return '' def query_device_info(box_sn): if INFO_SOURCE == 'mock': file_path = './mock/device.json' try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) device_list = [] for item in data: device = DeviceInfo() device.from_dict(item) device_list.append(device) return device_list except FileNotFoundError: print(f"File not found: {file_path}") return [] except json.JSONDecodeError: print("Error decoding JSON file.") return [] else: url = HTTP_SERVER + f"/device/list?box_code={box_sn}" try: response = requests.get(url, timeout=3) if response.status_code == 200: data = response.json() device_list = [] for item in data: device = DeviceInfo() device.from_dict(item) device_list.append(device) return device_list else: print(f"HTTP request failed with status code {response.status_code}") return [] except requests.RequestException as e: print(f"HTTP request error: {e}") return [] def query_device_model(device_id): if INFO_SOURCE == 'mock': file_path = './mock/device_model.json' try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) model_list = [] for item in data: if item["device_id"] == device_id: model = ModelInfo() model.from_dict(item) model_list.append(model) return model_list except FileNotFoundError: print(f"File not found: {file_path}") return [] except json.JSONDecodeError: print("Error decoding JSON file.") return [] else: url = HTTP_SERVER + f"/model/list?deviceId={device_id}" try: response = requests.get(url, timeout=3) if response.status_code == 200: data = response.json() model_list = [] for item in data: model = ModelInfo() model.from_dict(item) model_list.append(model) return model_list else: print(f"HTTP request failed with status code {response.status_code}") return [] except requests.RequestException as e: print(f"HTTP request error: {e}") return [] def alarm_upload(data): http_tool = HttpTool() url = HTTP_SERVER + '/safe-server/record/upload' http_tool.post(url, data)