Newer
Older
lynxi-casic-demo / info_query.py

class BaseInfo:
    def from_dict(self, data: dict):
        """递归动态初始化对象"""
        for key, value in data.items():
            if hasattr(self, key):
                attr = getattr(self, key)
                # 如果是嵌套对象,递归调用 from_dict
                if isinstance(attr, BaseInfo):
                    attr.from_dict(value)
                # 如果是列表并且需要初始化子对象
                elif isinstance(attr, list) and value and isinstance(value[0], dict):
                    obj_type = self._get_list_type(key)
                    setattr(self, key, [obj_type().from_dict(item) for item in value])
                else:
                    setattr(self, key, value)
        return self

    def _get_list_type(self, key):
        """动态获取列表中子对象的类型"""
        raise NotImplementedError("必须在子类中实现 _get_list_type 方法")

    def __str__(self):
        """打印对象的所有属性和值"""
        attributes = vars(self)
        return f"{self.__class__.__name__}({', '.join(f'{k}={v}' for k, v in attributes.items())})"

class DeviceInfo(BaseInfo):
    def __init__(self) -> None:
        self.device_id = 0
        self.device_no = 0
        self.device_type = ''
        self.input_url = ''
        self.output_url = ''
        self.recognition_interval = -1
        self.alarm_interval = -1
        self.mode_type = -1

class ModelObject(BaseInfo):
    def __init__(self) -> None:
        self.model_code = ''
        self.object_code = ''
        self.object_name = ''
        self.conf_threshold = 0.5
        self.alarm_threshold = 0
        self.range = None

class ModelInfo(BaseInfo):
    def __init__(self) -> None:
        self.model_code = ''
        self.model_name = ''
        self.model_path = ''
        self.handle_task = ''
        self.objects = []

    def _get_list_type(self, key):
        if key == "objects":
            return ModelObject


INFO_SOURCE = 'mock' # server
HTTP_SERVER = 'http://192.168.83.42:6909'

import json
import requests
import subprocess
from http_tool import HttpTool

def get_box_sn():
    command = "mcutools -s"
    try:
        # 执行命令
        result = subprocess.run(
            command,
            shell=True,              # 使用 shell 执行命令
            capture_output=True,     # 捕获标准输出和错误
            text=True                # 将输出作为文本而不是字节
        )
        # 检查返回码
        if result.returncode == 0:
            return result.stdout.strip()
        else:
            print(f' "error": {result.stderr.strip()}, "returncode": {result.returncode}')
            return ''
    except Exception as e:
        print(e)
        return ''

def query_device_info(box_sn):
    if INFO_SOURCE == 'mock':
        file_path = './mock/device.json'
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            device_list = []
            for item in data:
                device = DeviceInfo()
                device.from_dict(item)
                device_list.append(device)
            return device_list
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            return []
        except json.JSONDecodeError:
            print("Error decoding JSON file.")
            return []
    else:
        url = HTTP_SERVER + f"/device/list?box_code={box_sn}"
        try:
            response = requests.get(url, timeout=3)
            if response.status_code == 200:
                data = response.json()
                device_list = []
                for item in data:
                    device = DeviceInfo()
                    device.from_dict(item)
                    device_list.append(device)
                return device_list
            else:
                print(f"HTTP request failed with status code {response.status_code}")
                return []
        except requests.RequestException as e:
            print(f"HTTP request error: {e}")
            return []

def query_device_model(device_id):
    if INFO_SOURCE == 'mock':
        file_path = './mock/device_model.json'
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            model_list = []
            for item in data:
                if item["device_id"] == device_id:
                    model = ModelInfo()
                    model.from_dict(item)
                    model_list.append(model)
            return model_list
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            return []
        except json.JSONDecodeError:
            print("Error decoding JSON file.")
            return []
    else:
        url = HTTP_SERVER + f"/model/list?deviceId={device_id}"
        try:
            response = requests.get(url, timeout=3)
            if response.status_code == 200:
                data = response.json()
                model_list = []
                for item in data:
                    model = ModelInfo()
                    model.from_dict(item)
                    model_list.append(model)
                return model_list
            else:
                print(f"HTTP request failed with status code {response.status_code}")
                return []
        except requests.RequestException as e:
            print(f"HTTP request error: {e}")
            return []
        
def alarm_upload(data):
    http_tool = HttpTool()
    url = HTTP_SERVER + '/safe-server/record/upload'
    http_tool.post(url, data)