Newer
Older
lynxi-casic-demo / info_query copy.py
zhangyingjie on 24 Jan 9 KB 增加后台接口调用

from typing import Optional, get_type_hints


class BaseInfo:
    # 定义字段映射,子类可重写
    field_mapping = {}

    def from_dict(self, data: dict):
        """递归动态初始化对象"""
        hints = get_type_hints(self.__init__)
        print(f'h={hints}')
        
        for key, value in data.items():
            attr_key = self.field_mapping.get(key, key)
            if hasattr(self, attr_key):
                attr = getattr(self, attr_key)
                print(f'attr={attr}, attr_key={attr_key}')
                # 处理嵌套对象,如果为 None 则初始化
                if attr is None:
                    print('准备初始化嵌套对象...')
                    # 获取类型注解
                    attr_type = hints.get(attr_key, None)
                    print(f'获取到的属性类型: {attr_type}')
                    # 处理 Optional[T] 类型
                    if attr_type and hasattr(attr_type, '__origin__') and attr_type.__origin__ is Optional:
                        attr_type = attr_type.__args__[0]  # 提取 Optional 内部类型

                    if attr_type and isinstance(attr_type, type) and issubclass(attr_type, BaseInfo):
                        print(f'初始化嵌套对象: {attr_key} -> {attr_type}')
                        attr = attr_type()  # 初始化嵌套对象
                        setattr(self, attr_key, attr)

                # 如果是嵌套对象,递归调用 from_dict
                if isinstance(attr, BaseInfo):
                    print(f'1 {attr_key}')
                    attr.from_dict(value)
                # 如果是列表并且需要初始化子对象
                elif isinstance(attr, list) and value and isinstance(value[0], dict):
                    print(f'2 {attr_key}')
                    obj_type = self._get_list_type(attr_key)
                    setattr(self, attr_key, [obj_type().from_dict(item) for item in value])
                else:
                    print(f'3 {attr_key}')
                    setattr(self, attr_key, value)
        return self
    # def from_dict(self, data: dict):
    #     for key, value in data.items():
    #         attr_key = self.field_mapping.get(key, key)
            
    #         if hasattr(self, attr_key):
    #             attr = getattr(self, attr_key)
                
    #             # 处理嵌套对象
    #             if isinstance(attr, BaseInfo) or (attr is None and attr_key in self.__annotations__):
    #                 print(f'解析嵌套对象: {attr_key}')
    #                 obj_type = self.__annotations__.get(attr_key)
    #                 if attr is None and obj_type:
    #                     attr = obj_type()
    #                 attr.from_dict(value)
    #                 setattr(self, attr_key, attr)

    #             # 处理列表对象
    #             elif isinstance(attr, list) and value and isinstance(value[0], dict):
    #                 print(f'解析对象列表: {attr_key}')
    #                 obj_type = self._get_list_type(attr_key)
    #                 setattr(self, attr_key, [obj_type().from_dict(item) for item in value])

    #             # 直接赋值
    #             else:
    #                 print(f'直接赋值: {attr_key}')
    #                 setattr(self, attr_key, value)

    #     return self

    def _get_list_type(self, key):
        """动态获取列表中子对象的类型"""
        raise NotImplementedError("必须在子类中实现 _get_list_type 方法")

    def __str__(self):
        """打印对象的所有属性和值"""
        attributes = vars(self)
        return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})"

class DeviceInfo(BaseInfo):
    field_mapping = {
        'deviceId': 'device_id',
        'cameraIndexCode': 'device_no',
        'rtspUrl': 'input_url',
        'recognitionUrl': 'output_url',
        'recognitionInterval': 'recognition_interval',
        'alarmInterval': 'alarm_interval',
        'type': 'mode_type',
        'algoModels': 'model_relations',
        'sceneRelation': 'scene_relation',
    }
        
    def __init__(self) -> None:
        self.device_id = 0
        self.device_no = 0
        self.input_url = ''
        self.output_url = ''
        self.recognition_interval = -1
        self.alarm_interval = -1
        self.mode_type = -1
        # self.model_relations = []
        self.scene_relation : Optional[SceneInfo]  = None
        
    def _get_list_type(self, key):
        if key == "model_relations":
            return ModelInfo


class ModelObject(BaseInfo):
    field_mapping = {
        'algoModelId': 'model_id',
        'code': 'model_code',
        'typeCode': 'object_code',
        'remark': 'object_name',
        'isUse': 'is_use',
        'confidenceLevel': 'conf_threshold',
        'threshold': 'alarm_threshold',
        'boundary': 'range',
    }

    def __init__(self) -> None:
        self.model_id = 0
        self.model_code = ''
        self.object_code = ''
        self.object_name = ''
        self.is_use = 0
        self.conf_threshold = 0.5
        self.alarm_threshold = 0
        self.range = None

class ModelInfo(BaseInfo):
    field_mapping = {
        'id': 'model_id',
        'code': 'model_code',
        'modelName': 'model_name',
        'path': 'model_path',
        'handleTask': 'handle_task',
        'remark': 'remark',
        'subObjects': 'objects',
        'version': 'version'
    }

    def __init__(self) -> None:
        self.model_id = 0
        self.model_code = ''
        self.model_name = ''
        self.model_path = ''
        self.handle_task = ''
        self.remark = ''
        self.version = ''
        self.objects = []

    def _get_list_type(self, key):
        if key == "objects":
            return ModelObject
        
class SceneInfo(BaseInfo):
    field_mapping = {
        'sceneId': 'scene_id',
        'sceneCode': 'scene_code',
        'handleTask': 'handle_task',
        'boundary': 'boundary',
        'remark': 'remark',
        'version': 'version',
    }

    def __init__(self) -> None:
        self.scene_id = 0
        self.scene_code = ''
        self.handle_task = ''
        self.boundary = ''
        self.remark = ''
        self.version = ''


INFO_SOURCE = 'server' # server
HTTP_SERVER = 'http://192.168.83.42:6909'

import json
import requests
from constants import INFO_QUERY_URI, SERVER_BASE_URL
from http_tool import HttpTool



def query_device_info(box_sn):
    if INFO_SOURCE == 'mock':
        file_path = './mock/device.json'
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            device_list = []
            for item in data:
                device = DeviceInfo()
                device.from_dict(item)
                device_list.append(device)
            return device_list
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            return []
        except json.JSONDecodeError:
            print("Error decoding JSON file.")
            return []
    else:
        url = f"{SERVER_BASE_URL}{INFO_QUERY_URI}"
        params = {'code': box_sn}
        try:
            http_tool = HttpTool(box_sn=box_sn)
            res_data = http_tool.get(url, params=params, need_token=True)
            if res_data:
                print(json.dumps(res_data, ensure_ascii=False))
                device_list = []
                data = res_data['data']
                for item in data:
                    device = DeviceInfo()
                    device.from_dict(item)
                    device_list.append(device)
                return device_list
            else:
                print(f"fail to query device info")
                return []
        except requests.RequestException as e:
            print(f"HTTP request error: {e}")
            return []

def query_device_model(device_id):
    if INFO_SOURCE == 'mock':
        file_path = './mock/device_model.json'
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            model_list = []
            for item in data:
                if item["device_id"] == device_id:
                    model = ModelInfo()
                    model.from_dict(item)
                    model_list.append(model)
            return model_list
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            return []
        except json.JSONDecodeError:
            print("Error decoding JSON file.")
            return []
    else:
        url = HTTP_SERVER + f"/model/list?deviceId={device_id}"
        try:
            response = requests.get(url, timeout=3)
            if response.status_code == 200:
                data = response.json()
                model_list = []
                for item in data:
                    model = ModelInfo()
                    model.from_dict(item)
                    model_list.append(model)
                return model_list
            else:
                print(f"HTTP request failed with status code {response.status_code}")
                return []
        except requests.RequestException as e:
            print(f"HTTP request error: {e}")
            return []
        

if __name__ == "__main__":
    device = DeviceInfo()

    # print(DeviceInfo.__annotations__)  # 这里不会包含 scene_relation
    print(device.__dict__) 
    hints = get_type_hints(DeviceInfo.__init__)
    print(hints)
    print(hints.get('scene_relation'))