Newer
Older
lynxi-casic-demo / common.py
zhangyingjie on 24 Jan 4 KB 增加后台接口调用
# -*- coding: utf-8 -*-
"""
 * @file common.py
 * @author SDK_TEAM
 * @brief
 * @version 0.1
 * @date 2022-11-3
 * Copyright:
 * © 2018 北京灵汐科技有限公司 版权所有。
 * 注意:以下内容均为北京灵汐科技有限公司原创,未经本公司允许,不得转载,否则将视为侵权;对于不遵守此声明或者其他违法使用以下内容者,本公司依法保留追究权。
 * © 2018 Lynxi Technologies Co., Ltd. All rights reserved.
 * NOTICE: All information contained here is, and remains the property of Lynxi. This file can not
 * be copied or distributed without the permission of Lynxi Technologies Co., Ltd.
"""
import os
from datetime import datetime
import subprocess
import threading
import time
from global_logger import logger
import pylynchipsdk as sdk


def error_check(condition, log):
    if condition:
        logger.error("\n****** {} ERROR: {}".format(datetime.now(), log))
        # os._exit(-1)


###
# function error check and return error info
###
def error_check_ex(condition, code_line, log):
    if condition:
        logger.error(
            "\n****** {}  error_code_line: {}  ERROR: {}".format(
                datetime.now(), code_line, log
            )
        )
        # os._exit(-1)


def default_stream_error_handle(stream, errorMsg, params):
    logger.error("******* streamID : ", stream)
    logger.error("******* errorMsg.errCode : ", errorMsg.errCode)
    logger.error("******* errorMsg.errMsg : ", errorMsg.errMsg)
    logger.error("******* errorMsg.errModule : ", errorMsg.errModule)
    logger.error("******* errorMsg.errFunction : ", errorMsg.errFunction)
    logger.error("*******   params : ", params)
    # os._exit(-1)


def check_device_id(device_id):
    device_num, ret = sdk.lyn_get_device_count()
    if device_id < 0 and device_id >= device_num:
        logger.error(
            "device_id {} invalid, should in range [0, {})".format(
                device_id, device_num
            )
        )
        return False
    else:
        return True


def get_filename_without_ext(path):
    return os.path.splitext(os.path.basename(path))[0]


def print_record():
    global last_second
    global fps
    current_time = datetime.now()
    current_second = current_time.second
    if last_second != current_second:
        current_time = time.strftime("%H:%M:%S", time.localtime())
        logger.info(f"{current_time} fps: {fps}  ")
        fps = 0
        last_second = current_second
    fps += 1


record = threading.local()


def print_frame_rate(channel_name: str) -> None:
    record.fps = getattr(record, 'fps', 0) + 1
    current_second = time.time()
    record.last_second = getattr(record, 'last_second', current_second)
    if current_second - record.last_second >= 10:
        logger.info(f'channel {channel_name}, fps {record.fps/10.0}')
        record.last_second = time.time()
        record.fps = 0


def append_text_to_filename(relative_path, text_to_append):
    directory, filename = os.path.split(relative_path)
    filename_without_extension, extension = os.path.splitext(filename)
    new_filename = filename_without_extension + text_to_append + extension
    new_relative_path = os.path.join(directory, new_filename)
    return new_relative_path


def change_file_extension(file_path: str, new_ext: str) -> str:
    file_name, file_extension = os.path.splitext(file_path)
    new_file_path = file_name + new_ext
    return new_file_path

def get_box_sn():
    command = "mcutools -s"
    try:
        # 执行命令
        result = subprocess.run(
            command,
            shell=True,              # 使用 shell 执行命令
            capture_output=True,     # 捕获标准输出和错误
            text=True                # 将输出作为文本而不是字节
        )
        # 检查返回码
        if result.returncode == 0:
            r = result.stdout.strip()
            r = r.replace("SN: ", "").strip()
            return r
        else:
            logger.error('fail get box sn:')
            logger.error(f' "error": {result.stderr.strip()}, "returncode": {result.returncode}')
            return ''
    except Exception as e:
        logger.exception(f'fail get box sn: {e}')
        return ''