from dataclasses import MISSING, dataclass, field, fields from typing import Any, Dict, List, Optional, Type, get_type_hints class BaseInfo: @classmethod def from_dict(cls: Type['BaseInfo'], data: Dict[str, Any]) -> 'BaseInfo': # obj_fields = {f.metadata.get('json_key', f.name): f.name for f in fields(cls)} obj_fields = {f.metadata.get('json_key', f.name): f for f in fields(cls)} init_data = {} # for json_key, attr_name in obj_fields.items(): for json_key, field_info in obj_fields.items(): attr_name = field_info.name field_type = field_info.type default_value = ( field_info.default if field_info.default != MISSING else field_info.default_factory() if field_info.default_factory != MISSING else None # 这里设置一个全局的默认值 ) field_value = data.get(json_key, default_value) # if json_key in data: try: # field_value = data[json_key] # field_type = next(f.type for f in fields(cls) if f.name == attr_name) # 处理嵌套对象 if isinstance(field_value, dict): if hasattr(field_type, 'from_dict'): init_data[attr_name] = field_type.from_dict(field_value) else: init_data[attr_name] = field_value # 处理列表对象 elif hasattr(field_type, '__origin__') and field_type.__origin__ == list: if field_value: if isinstance(field_value, str): # 仅当 field_value 是字符串时解析 try: field_value = json.loads(field_value) except json.JSONDecodeError: pass # 如果解析失败,则保留原始值 list_item_type = field_type.__args__[0] if hasattr(list_item_type, 'from_dict'): init_data[attr_name] = [list_item_type.from_dict(item) if isinstance(item, dict) else item for item in field_value] else: init_data[attr_name] = field_value else: init_data[attr_name] = [] # 类型转换处理 elif field_type == float: init_data[attr_name] = float(field_value) if field_value not in (None, '') else default_value elif field_type == int: init_data[attr_name] = int(float(field_value)) if field_value not in (None, '') else default_value elif field_type == bool: init_data[attr_name] = field_value.lower() == 'true' if isinstance(field_value, str) else bool(field_value) else: init_data[attr_name] = field_value except (ValueError, TypeError): # 出现异常时回退到默认值 init_data[attr_name] = default_value return cls(**init_data) def __str__(self): """打印对象的所有属性和值""" attributes = vars(self) return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})" @dataclass class BoundaryPoint(BaseInfo): x: float y: float @dataclass class ModelObject(BaseInfo): model_id: int = field(metadata={'json_key': 'algoModelId'}) model_code : str = field(metadata={'json_key': 'code'}) object_code : str = field(metadata={'json_key': 'recognitionTypeId'}) object_name : str = field(metadata={'json_key': 'recognitionTypeName'}) range : List[BoundaryPoint] = field(metadata={'json_key': 'boundary'}) is_use : int = field(metadata={'json_key': 'isUse'}, default=0) conf_threshold : float = field(metadata={'json_key': 'confidenceLevel'}, default=0.5) alarm_threshold : int = field(metadata={'json_key': 'threshold'}, default=0) @dataclass class ModelInfo(BaseInfo): model_id: int = field(metadata={'json_key': 'id'}) model_code: str = field(metadata={'json_key': 'code'}) model_name : str = field(metadata={'json_key': 'modelName'}) model_path : str = field(metadata={'json_key': 'path'}) handle_task : str = field(metadata={'json_key': 'handleTask'}) remark : str = field(metadata={'json_key': 'remark'}) version : str = field(metadata={'json_key': 'version'}) objects : List[ModelObject] = field(metadata={'json_key': 'subObjects'}) def _get_list_type(self, key): if key == "objects": return ModelObject @dataclass class SceneInfo(BaseInfo): scene_id: int = field(metadata={'json_key': 'sceneId'}) scene_code: str = field(metadata={'json_key': 'sceneCode'}) handle_task: str = field(metadata={'json_key': 'handleTask'}) range: List[BoundaryPoint] = field(metadata={'json_key': 'boundary'}) remark: str = field(metadata={'json_key': 'remark'}) version: str = field(metadata={'json_key': 'version'}) scene_path : str = field(metadata={'json_key': 'path'}) @dataclass class DeviceInfo(BaseInfo): device_id: int = field(metadata={'json_key': 'deviceId'}) device_no: str = field(metadata={'json_key': 'cameraIndexCode'}) input_url: str = field(metadata={'json_key': 'rtspUrl'}) output_url: str = field(metadata={'json_key': 'recognitionUrl'}) model_relations: List[ModelInfo] = field(metadata={'json_key': 'algoModels'}) scene_relation : SceneInfo = field(metadata={'json_key': 'sceneRelation'}) recognition_interval: int = field(metadata={'json_key': 'recognitionInterval'}, default=-1) alarm_interval: int = field(metadata={'json_key': 'alarmInterval'}, default=-1) mode_type: int = field(metadata={'json_key': 'type'}, default=-1) INFO_SOURCE = 'mock' # mock/server import json import requests from constants import INFO_QUERY_URI, SERVER_BASE_URL from http_tool import HttpTool from global_logger import logger def query_device_info(box_sn): if INFO_SOURCE == 'mock': # file_path = './mock/device_server.json' file_path = './mock/device_server_scene.json' try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) device_list = [] for item in data: device = DeviceInfo.from_dict(item) device_list.append(device) return device_list except FileNotFoundError as e: logger.warning(f"File not found: {file_path}") return [] except json.JSONDecodeError as e: logger.warning("Error decoding JSON file.") return [] else: url = f"{SERVER_BASE_URL}{INFO_QUERY_URI}" params = {'code': box_sn} try: http_tool = HttpTool(box_sn=box_sn) res_data = http_tool.get(url, params=params, need_token=True) if res_data: logger.debug(json.dumps(res_data, ensure_ascii=False)) device_list = [] data = res_data['data'] for item in data: device = DeviceInfo.from_dict(item) device_list.append(device) return device_list else: logger.error(f"fail to query device info") return [] except requests.RequestException as e: logger.error(f"HTTP request error: {e}") return []