import base64 import importlib from blockqueue import block_queue from callback_data_struct import * from constants import ALARM_URI, SERVER_BASE_URL from detect_utils import bbox, Box, SubObject, SubClass, free_to_pool_callback, recv_frame_cb, set_padding_data from http_tool import HttpTool import common import bufferpool from global_logger import logger import math import queue import sys, os from types import SimpleNamespace from typing import List, Tuple import time import numpy as np import pylynchipsdk as sdk import copy import threading import multiprocessing import signal import sys import json import ctypes from ctypes import * import struct import subprocess import argparse from string_utils import camel_to_snake, get_class, get_fun # import ffmpeg try: import cv2 except ImportError: cv2 = None def cancel_process(signum, frame): global cancel_flag cancel_flag.value = True cancel_flag = multiprocessing.Value('b', False) http_tool = HttpTool() class ModelHandler: def __init__(self, model_path, plugin_path, model_name,model_code, handle_task, objects, image_width, image_height) -> None: self.model_path = model_path self.plugin_path = plugin_path self.model_name = model_name self.model_code = model_code self.handle_task = handle_task if handle_task else "base_model_handler" self.objects = objects logger.info(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}') self.model, ret = sdk.lyn_load_model(self.model_path) common.error_check(ret, "lyn_load_model") self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path) common.error_check(ret, "lyn_plugin_register") self.apu_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.plugin_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.apu_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.model_desc, ret = sdk.lyn_model_get_desc(self.model) common.error_check(ret, "lyn_model_get_desc") self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2] self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1] self.apu_output_size = self.model_desc.outputDataLen self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5 self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1] self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.sub_objects_info, ret = sdk.lyn_malloc(ctypes.sizeof(SubObject)) sub_obj = sdk.c_malloc(ctypes.sizeof(SubObject)) # 获取底层指针 pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] pythonapi.PyCapsule_GetPointer.restype = c_void_p raw_ptr = ctypes.pythonapi.PyCapsule_GetPointer(sub_obj, None) if not raw_ptr: raise ValueError("Failed to extract pointer from PyCapsule") # 转换为 ctypes.POINTER(SubObject) sub_obj_ptr = ctypes.cast(raw_ptr, ctypes.POINTER(SubObject)) # 通过 ctypes.POINTER 操作数据 sub_obj_instance = sub_obj_ptr.contents sub_obj_instance.objectsnum = len(self.objects) for i in range(sub_obj_instance.objectsnum): obj = self.objects[i] sub_obj_instance.objects[i].object_id = int(obj.object_code) sub_obj_instance.objects[i].object_name = obj.object_name sub_obj_instance.objects[i].conf_threshold = obj.conf_threshold sub_obj_instance.objects[i].alarm_threshold = obj.alarm_threshold sub_obj_instance.objects[i].model_code = obj.model_code if obj.range: # 转为(x1 y1 x2 y2 x3 y3 x4 y4) # range_values = [float(r) for r in obj.range] abs_range = [] for boundary_point in obj.range: abs_range.append(boundary_point.x * image_width) abs_range.append(boundary_point.y * image_height) logger.debug(f'abs_range = {abs_range}') sub_obj_instance.objects[i].range = (c_float * 8)(*abs_range) else: sub_obj_instance.objects[i].range = (0, 0, 0, 0, 0, 0, 0, 0) ret = sdk.lyn_memcpy( self.sub_objects_info, sub_obj, ctypes.sizeof(SubObject), sdk.lyn_memcpy_dir_t.ClientToServer ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") self.apu_output_mem_pool = bufferpool.buffer_pool( self.apu_output_size * self.batch_size, 5 ) logger.debug(f'batch_size = {self.batch_size}') logger.debug(f'model_width = {self.model_width}') logger.debug(f'model_height = {self.model_height}') logger.debug(f'apu_output_size = {self.apu_output_size}') logger.debug(f'class_num = {self.class_num}') logger.debug(f'anchor_size = {self.anchor_size}') def alarm_upload(data): global http_tool url = f'{SERVER_BASE_URL}{ALARM_URI}' http_tool.post(url = url, data = data, need_token=True) def show_video_cb(params): cb_data: cb_data = params[0] model_infos = params[1] last_alarm_time = params[2] device_no = cb_data.attr.device_no alarm_interval = cb_data.attr.alarm_interval if(alarm_interval <= 0): return 0 alarm_infos = [] for model_info in model_infos: boxes_info = model_info.boxes_info model_code = model_info.model_code dst_img_size = ctypes.sizeof(Box) host_buf_arr = np.ones(dst_img_size, dtype=np.uint8) host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr) ret = sdk.lyn_memcpy( host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None) c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents # box_dict = {"boxesnum": c_box.boxesnum, "boxes": []} # print(c_box.boxesnum) bbox_dicts = [] for i in range(c_box.boxesnum): bbox_instance = c_box.boxes[i] bbox_dict = { "xmin": bbox_instance.xmin, "ymin": bbox_instance.ymin, "xmax": bbox_instance.xmax, "ymax": bbox_instance.ymax, "score": bbox_instance.score, "id": bbox_instance.id, "label": bbox_instance.label, "alarm": bbox_instance.alarm, "model_code": bbox_instance.model_code, } # print(bbox_dict) bbox_dicts.append(bbox_dict) handle_task_name = model_info.handle_task handle_alarm_info = get_fun(f'model_handler.{handle_task_name}', 'handle_alarm_info') alarm_infos.extend(handle_alarm_info(bbox_dicts, last_alarm_time,device_no, alarm_interval)) if alarm_infos: data = np.empty(cb_data.frame.size, dtype=np.uint8) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, cb_data.frame.data, cb_data.frame.size, sdk.lyn_memcpy_dir_t.ServerToClient, ) common.error_check(ret, "lyn_memcpy") yuvImg = np.reshape( data, (cb_data.attr.height * 3 // 2, cb_data.attr.width) ).astype(np.uint8) import cv2 rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12) # 编码为 JPEG 格式 _, buffer = cv2.imencode('.jpg', rgbImg) # 转换为 Base64 并添加头部 image_base64 = f"data:image/jpeg;base64,{base64.b64encode(buffer).decode('utf-8')}" save_dir = "./saved_images" device_dir = os.path.join(save_dir, device_no) os.makedirs(device_dir, exist_ok=True) # 生成存储的文件名 timestamp = time.strftime('%Y%m%d_%H%M%S', time.localtime()) file_path = os.path.join(device_dir, f"{timestamp}.jpg") cv2.imwrite(file_path, rgbImg) for alarm_info in alarm_infos: logger.debug(alarm_info) alarm_info["picBase64"] = image_base64 alarm_upload(alarm_info) return 0 def save_file_cb(params): cb_data: save_file_cb_data = params[0] frame_cb_data: cb_data = params[1] packet = cb_data.packet data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet) common.error_check(ret, "lyn_enc_get_remote_packet_valid_size") data = np.zeros(data_size, dtype=np.byte) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "lyn_memcpy") cb_data.video_frame.put([data, packet.eos]) cb_data.recv_pool.push(packet.data) return ret def show_boxinfo_cb(params): model_name = params[0] boxes_info = params[1] dst_img_size = ctypes.sizeof(Box) host_buf_arr = np.ones(dst_img_size, dtype=np.uint8) host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr) ret = sdk.lyn_memcpy( host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None) c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents dump_box_json(model_name, c_box) return 0 def dump_box_json(model_name, c_box): box_dict = {"boxesnum": c_box.boxesnum, "boxes": []} for i in range(c_box.boxesnum): bbox_instance = c_box.boxes[i] bbox_dict = { "xmin": bbox_instance.xmin, "ymin": bbox_instance.ymin, "xmax": bbox_instance.xmax, "ymax": bbox_instance.ymax, "score": bbox_instance.score, "id": bbox_instance.id, "label": bbox_instance.label, "alarm": bbox_instance.alarm, "model_code": bbox_instance.model_code, } box_dict["boxes"].append(bbox_dict) logger.debug(box_dict) class yolov5: def __init__(self, attr: device_process_attr) -> None: self.attr = "" self.__ctx = "" self.__send_thread = "" self.__recv_thread = "" self.__ipe_thread = "" self.__demux_hdl = "" self.codec_para = "" self.__vdec_hdl = "" self.__vdec_attr = sdk.lyn_vdec_attr_t() self.__send_queue = block_queue() self.__ipe_queue = block_queue() self.__send_num = 0 self.__recv_num = 0 # 创建上下文环境 self.attr = copy.copy(attr) self.video_frame = attr.video_frame self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id) self.enc_head_flag = True common.error_check(ret, "create context") ret = sdk.lyn_register_error_handler(common.default_stream_error_handle) common.error_check(ret, 'lyn_register_error_handler') # 打开解封装器 self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) common.error_check(ret, "lyn_demux_open ") self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_codec_para ") self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_framerate") logger.debug(f'fps = {self.fps}') # 打开解码器 self.__vdec_attr.codec_id = self.codec_para.codec_id self.__vdec_attr.output_fmt = self.attr.output_fmt self.__vdec_attr.scale = self.attr.scale self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr) common.error_check(ret, "lyn_vdec_open ") self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info( self.codec_para, self.__vdec_attr ) logger.debug(f'self.vdec_out_info.width = {self.vdec_out_info.width}') logger.debug(f'self.vdec_out_info.height = {self.vdec_out_info.height}') common.error_check(ret, "lyn_vdec_get_out_info ") self.attr.width = self.vdec_out_info.width self.attr.height = self.vdec_out_info.height # 创建stream、event和ipe desc self.send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") # self.apu_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") # self.plugin_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") self.venc_recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") # self.apu_event, ret = sdk.lyn_create_event() # common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc") self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc") self.ipe_config, ret = sdk.lyn_ipe_create_config_desc() common.error_check(ret, "lyn_ipe_create_config_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_reset_config_desc") # 获取模型信息 self.model_infos = [] self.model_nums = 0 for info in self.attr.model_configs: model_name = f'./models/{os.path.splitext(os.path.basename(info.model_path))[0]}' self.model_infos.append(ModelHandler(f'{model_name}/Net_0', f'{model_name}/plugin.so', info.model_name, info.model_code, info.handle_task, info.objects, self.attr.width, self.attr.height)) self.model_nums += 1 self.batch_size = self.model_infos[0].batch_size self.model_width = self.model_infos[0].model_width self.model_height = self.model_infos[0].model_height self.apu_output_size = self.model_infos[0].apu_output_size logger.debug(f'self.batch_size = {self.batch_size}') logger.debug(f'self.model_width = {self.model_width}') logger.debug(f'self.model_height = {self.model_height}') logger.debug(f'self.apu_output_size = {self.apu_output_size}') self.ipe_output_size = ( self.model_width * self.model_height * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3] ) ( self.resize_width, self.resize_height, self.pad_x, self.pad_y, ) = set_padding_data( self.vdec_out_info.width, self.vdec_out_info.height, self.model_width, self.model_height, ) # 创建对象池 self.ipe_output_mem_pool = bufferpool.buffer_pool( self.ipe_output_size * self.batch_size, 5 ) self.venc_recv_pool = bufferpool.buffer_pool( self.vdec_out_info.predict_buf_size, 5 ) logger.debug(f'self.ipe_output_size = {self.ipe_output_size}') logger.debug(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}') # 设置编码参数 self.venc_attr = sdk.lyn_venc_attr_t() ret = sdk.lyn_venc_set_default_params(self.venc_attr) common.error_check(ret, "lyn_venc_set_default_params") self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264 self.venc_attr.width = self.vdec_out_info.width self.venc_attr.height = self.vdec_out_info.height self.venc_attr.bit_depth = 8 self.venc_attr.bframes_num = 0 self.venc_attr.pframes_num = 5 self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12 self.venc_attr.target_bitrate = 6000000 self.venc_attr.level = -1 self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr) common.error_check(ret, "lyn_vdec_open") self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.start_process_time = None self.last_push_time = None self.stream_out_process = None self.init_output_process() self.frame_step = 5 self.frame_idx = self.frame_step - 1 # 为了处理第0帧,这里不从0开始计数,而是从step-1开始 self.last_alarm_time = {} #key: model_code-objcet_code, value: time def init_output_process(self): command = ['ffmpeg', '-i', '-', "-c:v", "copy", '-f', 'rtsp', '-rtsp_transport', 'tcp', self.attr.output_path] # 启动FFmpeg子进程 try: self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: logger.exception(f"Failed to start process: {e}") except subprocess.SubprocessError as e: logger.exception(f"Subprocess error: {e}") except Exception as e: logger.exception(f"An unexpected error occurred: {e}") # stream_out_process = ( # ffmpeg # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25) # .output(output_path, vcodec='libx264', # preset='ultrafast', tune='zerolatency', # bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10) # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report']) # ) def read_stderr(process): for line in iter(process.stderr.readline, b''): logger.info(f"stderr: {line.decode('utf-8').strip()}") if self.stream_out_process: stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,)) stderr_thread.daemon = True stderr_thread.start() def push_video(self): # show window global cancel_flag frame_rate = self.fps # 目标帧率 frame_interval = 1 / frame_rate # 每帧间隔时间 if not self.last_push_time: self.last_push_time = time.time() while True: try: frame = self.video_frame.get(False) except: if cancel_flag.value: sys.exit() else: time.sleep(0.01) continue if frame[1] or cancel_flag.value: # cv2.destroyAllWindows() sys.exit() if frame and self.stream_out_process: image = frame[0] # resized_image = cv2.resize(image, (640, 360)) self.stream_out_process.stdin.write(image.tobytes()) # elapsed_time = time.time() - self.start_process_time elapsed_time = time.time() - self.last_push_time sleep_time = frame_interval - elapsed_time - 0.01 # print(f'elapsed_time = {elapsed_time*1000:.3f}ms, sleep_time = {sleep_time*1000:.3f}ms') time.sleep(max(0, sleep_time)) self.last_push_time = time.time() def send_thread_func(self, cancel_flag): # 设置上下文环境 创建发送stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: # 从解封装器读取一个包 pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl) eos = pkt.eos if ret != 0 and not eos: sdk.lyn_demux_close(self.__demux_hdl) time.sleep(500.0 / 1000) logger.warning("demux failed, reconnecting...") self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) if not ret: self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl) common.error_check(ret, "lyn_synchronize_stream") logger.debug(f'fps = {self.fps}') continue # 发送给解码器解码 ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt) common.error_check(ret, "lyn_vdec_send_packet_async") ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") # 释放packet内存并通知接收结果 if not eos: sdk.lyn_demux_free_packet(pkt) self.__send_num += 1 self.__send_queue.put(self.__send_num) else: self.__send_queue.put(-1) if cancel_flag.value: return def recv_thread_func(self, cancel_flag): # 设置上下文环境 创建接收stream sdk.lyn_set_current_context(self.__ctx) frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5) while self.__recv_num >= 0: self.__recv_num = self.__send_queue.take() cb_data = recv_cb_data() cb_data.frame.eos = self.__recv_num < 0 cb_data.frame.data = frame_pool.pop() cb_data.frame.size = self.vdec_out_info.predict_buf_size cb_data.frame_pool = frame_pool cb_data.send_num = self.__send_num cb_data.recv_num = self.__recv_num cb_data.attr = self.attr cb_data.block_queue = self.__ipe_queue cb_data.video_frame = self.video_frame # 插入接收指令,并添加接收完成回调函数 ret = sdk.lyn_vdec_recv_frame_async( self.recv_stream, self.__vdec_hdl, cb_data.frame ) common.error_check(ret, "lyn_vdec_recv_frame_async") ret = sdk.lyn_stream_add_async_callback( self.recv_stream, recv_frame_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") if cancel_flag.value: return def ipe_thread_func(self, cancel_flag): # 设置上下文环境 创建ipe stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: cb_data = self.__ipe_queue.take() eos = cb_data.frame.eos self.start_process_time = time.time() self.model_process(cb_data) # 这里的cb_data是从视频流解析出来的视频帧 if cancel_flag.value: return def ipe_process(self, cb_data: framepool_cb_data): sdk.lyn_set_current_context(self.__ctx) ipe_out_data = self.ipe_output_mem_pool.pop() # 设置ipe输入 ret = sdk.lyn_ipe_set_input_pic_desc( self.ipe_input_desc, cb_data.frame.data, self.vdec_out_info.width, self.vdec_out_info.height, self.__vdec_attr.output_fmt, ) common.error_check(ret, "lyn_ipe_set_input_pic_desc") ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data) common.error_check(ret, "lyn_ipe_set_output_pic_data") ret = sdk.lyn_ipe_set_resize_config( self.ipe_config, self.resize_width, self.resize_height ) common.error_check(ret, "lyn_ipe_set_resize_config") ret = sdk.lyn_ipe_set_pad_config( self.ipe_config, self.pad_y, self.pad_x, self.pad_y, self.pad_x, 114, 114, 114, ) common.error_check(ret, "lyn_ipe_set_pad_config") ret = sdk.lyn_ipe_set_c2c_config( self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0 ) common.error_check(ret, "lyn_ipe_set_c2c_config") ret = sdk.lyn_ipe_cal_output_pic_desc( self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0 ) common.error_check(ret, "lyn_ipe_cal_output_pic_desc") ret = sdk.lyn_ipe_process_async( self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config ) common.error_check(ret, "lyn_ipe_process_async") # ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) # common.error_check(ret, "lyn_record_event") return ipe_out_data def get_channel_info(self) -> str: return f'{self.attr.device_id}_{self.attr.output_path}' def model_process(self, cb_data): self.frame_idx = (self.frame_idx + 1) % self.frame_step if self.frame_idx == 0: # print(f'{self.frame_idx} process') # 正常处理 ipe_out_data = self.ipe_process(cb_data) for model_info in self.model_infos: apu_output_data = self.apu_process(model_info, ipe_out_data) self.plugin_process(model_info, apu_output_data, cb_data) ret = sdk.lyn_stream_add_async_callback( self.model_infos[-1].plugin_stream, show_video_cb, [cb_data, self.model_infos,self.last_alarm_time], ) common.error_check(ret, "lyn_stream_add_async_callback") # self.merge_plugin_process(cb_data) self.encode_process(cb_data) common.print_frame_rate(self.get_channel_info()) ret = sdk.lyn_stream_add_async_callback( self.model_infos[-1].apu_stream, free_to_pool_callback, [self.ipe_output_mem_pool, ipe_out_data], ) common.error_check(ret, "lyn_stream_add_async_callback") else: # print(f'{self.frame_idx} draw') # 跳过ipe apu处理,直接用上次的box渲染 for model_info in self.model_infos: self.plugin_draw_process(model_info, cb_data) self.encode_process(cb_data) common.print_frame_rate(self.get_channel_info()) def apu_process(self, model_info, ipe_out_data): # 等待IPE处理完成 ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event) common.error_check(ret, "lyn_stream_wait_event") apu_output_data = model_info.apu_output_mem_pool.pop() ret = sdk.lyn_execute_model_async( model_info.apu_stream, model_info.model, ipe_out_data, apu_output_data, self.batch_size, ) common.error_check(ret, "lyn_execute_model_async") return apu_output_data def plugin_process(self,model_info, apu_output_data, cb_data): # 等待apu处理完成 ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event) common.error_check(ret, "lyn_stream_wait_event") format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) sub_objects_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.sub_objects_info, None) post_para = struct.pack( '6IH2f?3P', self.codec_para.width, self.codec_para.height, self.model_width, self.model_height, model_info.class_num, 500, # nmsTopK model_info.anchor_size, 0.25, # score threashold 0.45, # nms threshold True, # is pad resize apu_data_ptr, boxes_info_ptr, sub_objects_info_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para) ) common.error_check(ret, "lyn_plugin_run_async") draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # model_info.plugin_stream, # show_boxinfo_cb, # [model_info.model_name, model_info.boxes_info], # ) # ret = sdk.lyn_stream_add_async_callback( # model_info.plugin_stream, show_video_cb, [cb_data,model_info.boxes_info,model_info.model_code] # ) # common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_stream_add_async_callback( model_info.plugin_stream, free_to_pool_callback, [model_info.apu_output_mem_pool, apu_output_data], ) common.error_check(ret, "lyn_stream_add_async_callback") def plugin_draw_process(self,model_info, cb_data): format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") def merge_plugin_process(self, cb_data): plugin = self.model_infos[-1].plugin plugin_stream = self.model_infos[-1].plugin_stream pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) sources_ptrs = [ pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos ] while len(sources_ptrs) < 4: sources_ptrs.append(0) box_merge_params = struct.pack( "P4PH", # 格式:目标指针 + 4 个源指针 + 源数量 target_ptr, # target 的地址 *sources_ptrs, # sources 的地址列表 len(self.model_infos) # 源数量 ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params) ) common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # plugin_stream, # show_boxinfo_cb, # ['all', self.total_boxes_info], # ) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") ret = sdk.lyn_stream_add_async_callback( plugin_stream, show_video_cb, [cb_data,self.total_boxes_info] ) common.error_check(ret, "lyn_stream_add_async_callback") def encode_process(self, cb_data: cb_data): # 在 plugin 处理完之后,进行编码操作 ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.enc_head_flag: # 第一帧编码 self.enc_head_flag = False enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.output_path = self.attr.output_path encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_venc_get_paramsset_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_get_paramsset_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, [encode_data, cb_data] ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_sendframe_async( self.venc_send_stream, self.venc_handle, cb_data.frame ) common.error_check(ret, "lyn_venc_sendframe_async") enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.eos = cb_data.frame.eos enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.output_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame # ret = sdk.lyn_stream_add_async_callback( # self.venc_send_stream, # free_to_pool_callback, # [cb_data.frame_pool, cb_data.frame.data], # ) # common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_recvpacket_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_recvpacket_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, [encode_data, cb_data] ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, free_to_pool_callback, [cb_data.frame_pool, cb_data.frame.data], ) common.error_check(ret, "lyn_stream_add_async_callback") def run(self): global cancel_flag # 开启发送线程 self.__send_thread = threading.Thread( target=self.send_thread_func, args=(cancel_flag,) ) self.__send_thread.start() # 开启接收线程 self.__recv_thread = threading.Thread( target=self.recv_thread_func, args=(cancel_flag,) ) self.__recv_thread.start() # 开启图像处理和推理线程 self.__ipe_thread = threading.Thread( target=self.ipe_thread_func, args=(cancel_flag,) ) self.__ipe_thread.start() def close(self): if self.__send_thread.is_alive(): self.__send_thread.join() if self.__recv_thread.is_alive(): self.__recv_thread.join() if self.__ipe_thread.is_alive(): self.__ipe_thread.join() ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.ipe_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.apu_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.plugin_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_destroy_stream(self.send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.ipe_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.apu_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.plugin_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_event(self.ipe_event) common.error_check(ret, "lyn_destroy_event") ret = sdk.lyn_destroy_event(self.apu_event) common.error_check(ret, "lyn_destroy_event") # destory ipe desc and config ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_destroy_config_desc") ret = sdk.lyn_plugin_unregister(self.plugin) common.error_check(ret, "lyn_plugin_unregister") # 卸载模型 for model_info in self.model_infos: ret = sdk.lyn_unload_model(model_info.model) common.error_check(ret, "lyn_unload_model") if self.__vdec_hdl != "": ret = sdk.lyn_vdec_close(self.__vdec_hdl) common.error_check(ret, "lyn_vdec_close") if self.__ctx != "": ret = sdk.lyn_destroy_context(self.__ctx) common.error_check(ret, "lyn_destroy_context") def run_device(device_no:str, input_url: str, output_url:str, device_id: int, model_configs, alarm_interval, threads) -> None: attr = device_process_attr() attr.device_no = device_no attr.url = input_url attr.output_path = output_url attr.device_id = device_id attr.alarm_interval = alarm_interval attr.video_frame = queue.Queue(10) attr.model_configs = model_configs if not attr.url: raise ValueError('input file miss!!!') if not attr.output_path: raise ValueError('unspecified output path!!!') try: decoder = yolov5(attr) infer_thread = threading.Thread(target=decoder.run, args=()) threads.append(infer_thread) infer_thread.start() out_thread = threading.Thread(target=decoder.push_video, args=()) threads.append(out_thread) out_thread.start() except Exception as e: logger.exception(f'model thread start failed: {e}') # decoder.close() if __name__ == "__main__": signal.signal(signal.SIGINT, cancel_process) devices = [ { 'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101', 'output_url': 'rtmp://192.168.83.100:1935/live/101', 'model_configs': [ { 'model_path': 'models/yolov5s/Net_0', 'plugin_path': 'plugins/libYolov5Plugin.so', 'model_name': 'yolov5s', }, { 'model_path': 'models/yuntai/Net_0', 'plugin_path': 'plugins/libMultiPlugin.so', 'model_name': 'yuntai', } ] }, { 'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101', 'output_url': 'rtmp://192.168.83.100:1935/live/102', 'model_configs': [ { 'model_path': 'models/yolov5s/Net_0', 'plugin_path': 'plugins/libYolov5Plugin.so', 'model_name': 'yolov5s', }, { 'model_path': 'models/yuntai/Net_0', 'plugin_path': 'plugins/libMultiPlugin.so', 'model_name': 'yuntai', } ] }, # { # 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', # 'output_url': 'rtsp://192.168.1.100:8554/live/test3', # 'model_configs': [ # { # 'model_path': 'models/yolov5s/Net_0', # 'plugin_path': 'plugins/libYolov5Plugin.so', # 'model_name': 'yolov5s', # }, # { # 'model_path': 'models/yuntai/Net_0', # 'plugin_path': 'plugins/libMultiPlugin.so', # 'model_name': 'yuntai', # } # ] # }, # { # 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', # 'output_url': 'rtsp://192.168.1.100:8554/live/test4', # 'model_configs': [ # { # 'model_path': 'models/yolov5s/Net_0', # 'plugin_path': 'plugins/libYolov5Plugin.so', # 'model_name': 'yolov5s', # }, # { # 'model_path': 'models/yuntai/Net_0', # 'plugin_path': 'plugins/libMultiPlugin.so', # 'model_name': 'yuntai', # } # ] # } ] device_count, ret = sdk.lyn_get_device_count() if ret != 0 or device_count <= 0: print("Error: Unable to fetch device count or no devices available.") exit(1) threads = [] for idx, device in enumerate(devices): # 通过取余计算均匀分配的 device_id,设备 ID 从 0 开始 device_id = idx % device_count run_device(device['input_url'],device['output_url'],device_id, device['model_configs'], threads) # 等待所有线程完成 for thread in threads: thread.join()