from typing import Optional, get_type_hints class BaseInfo: # 定义字段映射,子类可重写 field_mapping = {} def from_dict(self, data: dict): """递归动态初始化对象""" hints = get_type_hints(self.__init__) print(f'h={hints}') for key, value in data.items(): attr_key = self.field_mapping.get(key, key) if hasattr(self, attr_key): attr = getattr(self, attr_key) print(f'attr={attr}, attr_key={attr_key}') # 处理嵌套对象,如果为 None 则初始化 if attr is None: print('准备初始化嵌套对象...') # 获取类型注解 attr_type = hints.get(attr_key, None) print(f'获取到的属性类型: {attr_type}') # 处理 Optional[T] 类型 if attr_type and hasattr(attr_type, '__origin__') and attr_type.__origin__ is Optional: attr_type = attr_type.__args__[0] # 提取 Optional 内部类型 if attr_type and isinstance(attr_type, type) and issubclass(attr_type, BaseInfo): print(f'初始化嵌套对象: {attr_key} -> {attr_type}') attr = attr_type() # 初始化嵌套对象 setattr(self, attr_key, attr) # 如果是嵌套对象,递归调用 from_dict if isinstance(attr, BaseInfo): print(f'1 {attr_key}') attr.from_dict(value) # 如果是列表并且需要初始化子对象 elif isinstance(attr, list) and value and isinstance(value[0], dict): print(f'2 {attr_key}') obj_type = self._get_list_type(attr_key) setattr(self, attr_key, [obj_type().from_dict(item) for item in value]) else: print(f'3 {attr_key}') setattr(self, attr_key, value) return self # def from_dict(self, data: dict): # for key, value in data.items(): # attr_key = self.field_mapping.get(key, key) # if hasattr(self, attr_key): # attr = getattr(self, attr_key) # # 处理嵌套对象 # if isinstance(attr, BaseInfo) or (attr is None and attr_key in self.__annotations__): # print(f'解析嵌套对象: {attr_key}') # obj_type = self.__annotations__.get(attr_key) # if attr is None and obj_type: # attr = obj_type() # attr.from_dict(value) # setattr(self, attr_key, attr) # # 处理列表对象 # elif isinstance(attr, list) and value and isinstance(value[0], dict): # print(f'解析对象列表: {attr_key}') # obj_type = self._get_list_type(attr_key) # setattr(self, attr_key, [obj_type().from_dict(item) for item in value]) # # 直接赋值 # else: # print(f'直接赋值: {attr_key}') # setattr(self, attr_key, value) # return self def _get_list_type(self, key): """动态获取列表中子对象的类型""" raise NotImplementedError("必须在子类中实现 _get_list_type 方法") def __str__(self): """打印对象的所有属性和值""" attributes = vars(self) return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})" class DeviceInfo(BaseInfo): field_mapping = { 'deviceId': 'device_id', 'cameraIndexCode': 'device_no', 'rtspUrl': 'input_url', 'recognitionUrl': 'output_url', 'recognitionInterval': 'recognition_interval', 'alarmInterval': 'alarm_interval', 'type': 'mode_type', 'algoModels': 'model_relations', 'sceneRelation': 'scene_relation', } def __init__(self) -> None: self.device_id = 0 self.device_no = 0 self.input_url = '' self.output_url = '' self.recognition_interval = -1 self.alarm_interval = -1 self.mode_type = -1 # self.model_relations = [] self.scene_relation : Optional[SceneInfo] = None def _get_list_type(self, key): if key == "model_relations": return ModelInfo class ModelObject(BaseInfo): field_mapping = { 'algoModelId': 'model_id', 'code': 'model_code', 'typeCode': 'object_code', 'remark': 'object_name', 'isUse': 'is_use', 'confidenceLevel': 'conf_threshold', 'threshold': 'alarm_threshold', 'boundary': 'range', } def __init__(self) -> None: self.model_id = 0 self.model_code = '' self.object_code = '' self.object_name = '' self.is_use = 0 self.conf_threshold = 0.5 self.alarm_threshold = 0 self.range = None class ModelInfo(BaseInfo): field_mapping = { 'id': 'model_id', 'code': 'model_code', 'modelName': 'model_name', 'path': 'model_path', 'handleTask': 'handle_task', 'remark': 'remark', 'subObjects': 'objects', 'version': 'version' } def __init__(self) -> None: self.model_id = 0 self.model_code = '' self.model_name = '' self.model_path = '' self.handle_task = '' self.remark = '' self.version = '' self.objects = [] def _get_list_type(self, key): if key == "objects": return ModelObject class SceneInfo(BaseInfo): field_mapping = { 'sceneId': 'scene_id', 'sceneCode': 'scene_code', 'handleTask': 'handle_task', 'boundary': 'boundary', 'remark': 'remark', 'version': 'version', } def __init__(self) -> None: self.scene_id = 0 self.scene_code = '' self.handle_task = '' self.boundary = '' self.remark = '' self.version = '' INFO_SOURCE = 'server' # server HTTP_SERVER = 'http://192.168.83.42:6909' import json import requests from constants import INFO_QUERY_URI, SERVER_BASE_URL from http_tool import HttpTool def query_device_info(box_sn): if INFO_SOURCE == 'mock': file_path = './mock/device.json' try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) device_list = [] for item in data: device = DeviceInfo() device.from_dict(item) device_list.append(device) return device_list except FileNotFoundError: print(f"File not found: {file_path}") return [] except json.JSONDecodeError: print("Error decoding JSON file.") return [] else: url = f"{SERVER_BASE_URL}{INFO_QUERY_URI}" params = {'code': box_sn} try: http_tool = HttpTool(box_sn=box_sn) res_data = http_tool.get(url, params=params, need_token=True) if res_data: print(json.dumps(res_data, ensure_ascii=False)) device_list = [] data = res_data['data'] for item in data: device = DeviceInfo() device.from_dict(item) device_list.append(device) return device_list else: print(f"fail to query device info") return [] except requests.RequestException as e: print(f"HTTP request error: {e}") return [] def query_device_model(device_id): if INFO_SOURCE == 'mock': file_path = './mock/device_model.json' try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) model_list = [] for item in data: if item["device_id"] == device_id: model = ModelInfo() model.from_dict(item) model_list.append(model) return model_list except FileNotFoundError: print(f"File not found: {file_path}") return [] except json.JSONDecodeError: print("Error decoding JSON file.") return [] else: url = HTTP_SERVER + f"/model/list?deviceId={device_id}" try: response = requests.get(url, timeout=3) if response.status_code == 200: data = response.json() model_list = [] for item in data: model = ModelInfo() model.from_dict(item) model_list.append(model) return model_list else: print(f"HTTP request failed with status code {response.status_code}") return [] except requests.RequestException as e: print(f"HTTP request error: {e}") return [] if __name__ == "__main__": device = DeviceInfo() # print(DeviceInfo.__annotations__) # 这里不会包含 scene_relation print(device.__dict__) hints = get_type_hints(DeviceInfo.__init__) print(hints) print(hints.get('scene_relation'))