# -*- coding: utf-8 -*- """ * @file common.py * @author SDK_TEAM * @brief * @version 0.1 * @date 2022-11-3 * Copyright: * © 2018 北京灵汐科技有限公司 版权所有。 * 注意:以下内容均为北京灵汐科技有限公司原创,未经本公司允许,不得转载,否则将视为侵权;对于不遵守此声明或者其他违法使用以下内容者,本公司依法保留追究权。 * © 2018 Lynxi Technologies Co., Ltd. All rights reserved. * NOTICE: All information contained here is, and remains the property of Lynxi. This file can not * be copied or distributed without the permission of Lynxi Technologies Co., Ltd. """ import os from datetime import datetime import subprocess import threading import time from global_logger import logger import pylynchipsdk as sdk def error_check(condition, log): if condition: logger.error("\n****** {} ERROR: {}".format(datetime.now(), log)) # os._exit(-1) ### # function error check and return error info ### def error_check_ex(condition, code_line, log): if condition: logger.error( "\n****** {} error_code_line: {} ERROR: {}".format( datetime.now(), code_line, log ) ) # os._exit(-1) def default_stream_error_handle(stream, errorMsg, params): logger.error("******* streamID : ", stream) logger.error("******* errorMsg.errCode : ", errorMsg.errCode) logger.error("******* errorMsg.errMsg : ", errorMsg.errMsg) logger.error("******* errorMsg.errModule : ", errorMsg.errModule) logger.error("******* errorMsg.errFunction : ", errorMsg.errFunction) logger.error("******* params : ", params) # os._exit(-1) def check_device_id(device_id): device_num, ret = sdk.lyn_get_device_count() if device_id < 0 and device_id >= device_num: logger.error( "device_id {} invalid, should in range [0, {})".format( device_id, device_num ) ) return False else: return True def get_filename_without_ext(path): return os.path.splitext(os.path.basename(path))[0] def print_record(): global last_second global fps current_time = datetime.now() current_second = current_time.second if last_second != current_second: current_time = time.strftime("%H:%M:%S", time.localtime()) logger.info(f"{current_time} fps: {fps} ") fps = 0 last_second = current_second fps += 1 record = threading.local() def print_frame_rate(channel_name: str) -> None: record.fps = getattr(record, 'fps', 0) + 1 current_second = time.time() record.last_second = getattr(record, 'last_second', current_second) if current_second - record.last_second >= 10: logger.info(f'channel {channel_name}, fps {record.fps/10.0}') record.last_second = time.time() record.fps = 0 def append_text_to_filename(relative_path, text_to_append): directory, filename = os.path.split(relative_path) filename_without_extension, extension = os.path.splitext(filename) new_filename = filename_without_extension + text_to_append + extension new_relative_path = os.path.join(directory, new_filename) return new_relative_path def change_file_extension(file_path: str, new_ext: str) -> str: file_name, file_extension = os.path.splitext(file_path) new_file_path = file_name + new_ext return new_file_path def get_box_sn(): command = "mcutools -s" try: # 执行命令 result = subprocess.run( command, shell=True, # 使用 shell 执行命令 capture_output=True, # 捕获标准输出和错误 text=True # 将输出作为文本而不是字节 ) # 检查返回码 if result.returncode == 0: r = result.stdout.strip() r = r.replace("SN: ", "").strip() return r else: logger.error('fail get box sn:') logger.error(f' "error": {result.stderr.strip()}, "returncode": {result.returncode}') return '' except Exception as e: logger.exception(f'fail get box sn: {e}') return ''