Newer
Older
lynxi-casic-demo / info_query.py
zhangyingjie on 24 Jan 7 KB 增加后台接口调用

from dataclasses import MISSING, dataclass, field, fields
from typing import Any, Dict, List, Optional, Type, get_type_hints



class BaseInfo:
    @classmethod
    def from_dict(cls: Type['BaseInfo'], data: Dict[str, Any]) -> 'BaseInfo':
        # obj_fields = {f.metadata.get('json_key', f.name): f.name for f in fields(cls)}
        obj_fields = {f.metadata.get('json_key', f.name): f for f in fields(cls)}
        init_data = {}

        # for json_key, attr_name in obj_fields.items():
        for json_key, field_info in obj_fields.items():

            attr_name = field_info.name
            field_type = field_info.type
            default_value = (
                field_info.default
                if field_info.default != MISSING
                else field_info.default_factory() if field_info.default_factory != MISSING
                else None  # 这里设置一个全局的默认值
            )
            field_value = data.get(json_key, default_value)

            # if json_key in data:
            try:
                # field_value = data[json_key]
                # field_type = next(f.type for f in fields(cls) if f.name == attr_name)
                # 处理嵌套对象
                if isinstance(field_value, dict):
                    if hasattr(field_type, 'from_dict'):
                        init_data[attr_name] = field_type.from_dict(field_value)
                    else:
                        init_data[attr_name] = field_value
                # 处理列表对象
                elif hasattr(field_type, '__origin__') and field_type.__origin__ == list:
                    if field_value:
                        if isinstance(field_value, str):  # 仅当 field_value 是字符串时解析
                            try:
                                field_value = json.loads(field_value)
                            except json.JSONDecodeError:
                                pass  # 如果解析失败,则保留原始值
                        list_item_type = field_type.__args__[0]
                        if hasattr(list_item_type, 'from_dict'):
                            init_data[attr_name] = [list_item_type.from_dict(item) if isinstance(item, dict) else item for item in field_value]
                        else:
                            init_data[attr_name] = field_value
                    else:
                        init_data[attr_name] = []

                # 类型转换处理
                elif field_type == float:
                    init_data[attr_name] = float(field_value) if field_value not in (None, '') else default_value
                elif field_type == int:
                    init_data[attr_name] = int(float(field_value)) if field_value not in (None, '') else default_value
                elif field_type == bool:
                    init_data[attr_name] = field_value.lower() == 'true' if isinstance(field_value, str) else bool(field_value)
                else:
                    init_data[attr_name] = field_value
            except (ValueError, TypeError):
                # 出现异常时回退到默认值
                init_data[attr_name] = default_value


        return cls(**init_data)

    def __str__(self):
        """打印对象的所有属性和值"""
        attributes = vars(self)
        return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})"

@dataclass
class BoundaryPoint(BaseInfo):
    x: float
    y: float

@dataclass
class ModelObject(BaseInfo):    
    model_id: int = field(metadata={'json_key': 'algoModelId'})
    model_code : str = field(metadata={'json_key': 'code'})
    object_code : str = field(metadata={'json_key': 'recognitionTypeId'})
    object_name : str = field(metadata={'json_key': 'recognitionTypeName'})
    range : List[BoundaryPoint] = field(metadata={'json_key': 'boundary'})
    is_use : int = field(metadata={'json_key': 'isUse'}, default=0)
    conf_threshold : float = field(metadata={'json_key': 'confidenceLevel'}, default=0.5)
    alarm_threshold : int = field(metadata={'json_key': 'threshold'}, default=0)
    
@dataclass
class ModelInfo(BaseInfo):
    model_id: int = field(metadata={'json_key': 'id'})
    model_code: str = field(metadata={'json_key': 'code'})
    model_name : str = field(metadata={'json_key': 'modelName'})
    model_path : str = field(metadata={'json_key': 'path'})
    handle_task : str = field(metadata={'json_key': 'handleTask'})
    remark : str = field(metadata={'json_key': 'remark'})
    version : str = field(metadata={'json_key': 'version'})
    objects : List[ModelObject] = field(metadata={'json_key': 'subObjects'})

    def _get_list_type(self, key):
        if key == "objects":
            return ModelObject

@dataclass        
class SceneInfo(BaseInfo):    
    scene_id: int = field(metadata={'json_key': 'sceneId'})
    scene_code: str = field(metadata={'json_key': 'sceneCode'})
    handle_task: str = field(metadata={'json_key': 'handleTask'})
    range: List[BoundaryPoint] = field(metadata={'json_key': 'boundary'})
    remark: str = field(metadata={'json_key': 'remark'})
    version: str = field(metadata={'json_key': 'version'})
    scene_path : str = field(metadata={'json_key': 'path'})

@dataclass
class DeviceInfo(BaseInfo):
    device_id: int = field(metadata={'json_key': 'deviceId'})
    device_no: str = field(metadata={'json_key': 'cameraIndexCode'})
    input_url: str = field(metadata={'json_key': 'rtspUrl'})
    output_url: str = field(metadata={'json_key': 'recognitionUrl'})

    model_relations: List[ModelInfo] = field(metadata={'json_key': 'algoModels'})
    scene_relation : SceneInfo = field(metadata={'json_key': 'sceneRelation'})

    recognition_interval: int = field(metadata={'json_key': 'recognitionInterval'}, default=-1)
    alarm_interval: int = field(metadata={'json_key': 'alarmInterval'}, default=-1)
    mode_type: int = field(metadata={'json_key': 'type'}, default=-1)
        

INFO_SOURCE = 'mock' # mock/server

import json
import requests
from constants import INFO_QUERY_URI, SERVER_BASE_URL
from http_tool import HttpTool

from global_logger import logger



def query_device_info(box_sn):
    if INFO_SOURCE == 'mock':
        # file_path = './mock/device_server.json'
        file_path = './mock/device_server_scene.json'
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            device_list = []
            for item in data:
                device = DeviceInfo.from_dict(item)
                device_list.append(device)
            return device_list
        except FileNotFoundError as e:
            logger.warning(f"File not found: {file_path}")
            return []
        except json.JSONDecodeError as e:
            logger.warning("Error decoding JSON file.")
            return []
    else:
        url = f"{SERVER_BASE_URL}{INFO_QUERY_URI}"
        params = {'code': box_sn}
        try:
            http_tool = HttpTool(box_sn=box_sn)
            res_data = http_tool.get(url, params=params, need_token=True)
            if res_data:
                logger.debug(json.dumps(res_data, ensure_ascii=False))
                device_list = []
                data = res_data['data']
                for item in data:
                    device = DeviceInfo.from_dict(item)
                    device_list.append(device)
                return device_list
            else:
                logger.error(f"fail to query device info")
                return []
        except requests.RequestException as e:
            logger.error(f"HTTP request error: {e}")
            return []