from blockqueue import block_queue from callback_data_struct import * import common import bufferpool import math import queue import sys, os from types import SimpleNamespace from typing import List, Tuple import time import numpy as np import pylynchipsdk as sdk import copy import threading import multiprocessing import signal import sys import json import ctypes from ctypes import * import struct import subprocess import argparse # import ffmpeg try: import cv2 except ImportError: cv2 = None model_config = [ { 'model_path': 'models/yolov5s/Net_0', 'plugin_path': 'plugins/libYolov5Plugin.so', 'model_name': 'yolov5s', }, { 'model_path': 'models/yuntai/Net_0', 'plugin_path': 'plugins/libMultiPlugin.so', 'model_name': 'yuntai', } ] stream_out_process = None start_process_time = None def cancel_process(signum, frame): global cancel_flag cancel_flag.value = True cancel_flag = multiprocessing.Value('b', False) class bbox(ctypes.Structure): _fields_ = [ ("xmin", ctypes.c_uint32), ("ymin", ctypes.c_uint32), ("xmax", ctypes.c_uint32), ("ymax", ctypes.c_uint32), ("score", ctypes.c_float), ("id", ctypes.c_uint32), ("label", ctypes.c_wchar * 64), ] class Box(ctypes.Structure): _fields_ = [("boxesnum", ctypes.c_uint32), ("boxes", ctypes.ARRAY(bbox, 256))] class ModelInfo: def __init__(self, model_path, plugin_path, model_name = '') -> None: self.model_path = model_path self.plugin_path = plugin_path self.model_name = model_name print(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}') self.model, ret = sdk.lyn_load_model(self.model_path) common.error_check(ret, "lyn_load_model") self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path) common.error_check(ret, "lyn_plugin_register") self.apu_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.plugin_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.apu_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.model_desc, ret = sdk.lyn_model_get_desc(self.model) common.error_check(ret, "lyn_model_get_desc") self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2] self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1] self.apu_output_size = self.model_desc.outputDataLen self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5 self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1] self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.apu_output_mem_pool = bufferpool.buffer_pool( self.apu_output_size * self.batch_size, 5 ) print(f'batch_size = {self.batch_size}') print(f'model_width = {self.model_width}') print(f'model_height = {self.model_height}') print(f'apu_output_size = {self.apu_output_size}') print(f'class_num = {self.class_num}') print(f'anchor_size = {self.anchor_size}') def recv_frame_cb(cb_data: cb_data): # 放到ipe的处理队列中 cb_data.block_queue.put(cb_data) return 0 def free_to_pool_callback(params): vdec_out_pool = params[0] data = params[1] vdec_out_pool.push(data) return 0 def set_even(num): num = math.floor(num) if num % 2 != 0: res = num - 1 else: res = num return res def set_padding_data(vidoe_width, video_height, model_width, model_height): if vidoe_width > video_height: resize_width = model_width pad_x = 0 resize_height = set_even(int(video_height * model_width / vidoe_width)) pad_y = (model_height - resize_height) / 2 else: resize_height = model_height pad_y = 0 resize_width = set_even(int(vidoe_width * model_width / video_height)) pad_x = (model_width - resize_width) / 2 return int(resize_width), int(resize_height), int(pad_x), int(pad_y) def show_video_cb(cb_data: cb_data): data = np.empty(cb_data.frame.size, dtype=np.uint8) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, cb_data.frame.data, cb_data.frame.size, sdk.lyn_memcpy_dir_t.ServerToClient, ) common.error_check(ret, "lyn_memcpy") yuvImg = np.reshape( data, (cb_data.attr.height * 3 // 2, cb_data.attr.width) ).astype(np.uint8) import cv2 rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12) if cb_data.frame.eos: cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) else: cb_data.attr.video_frame.queue.clear() cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) cb_data.frame_pool.push(cb_data.frame.data) return 0 def save_file_cb(cb_data: save_file_cb_data): packet = cb_data.packet data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet) common.error_check(ret, "lyn_enc_get_remote_packet_valid_size") data = np.zeros(data_size, dtype=np.byte) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "lyn_memcpy") cb_data.video_frame.put([data, packet.eos]) cb_data.recv_pool.push(packet.data) # with open(cb_data.output_path, "ab") as f: # f.write(data.tobytes()) return ret def show_boxinfo_cb(params): model_name = params[0] boxes_info = params[1] dst_img_size = ctypes.sizeof(Box) host_buf_arr = np.ones(dst_img_size, dtype=np.uint8) host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr) ret = sdk.lyn_memcpy( host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None) c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents dump_box_json(model_name, c_box) return 0 def dump_box_json(model_name, c_box): box_dict = {"boxesnum": c_box.boxesnum, "boxes": []} for i in range(c_box.boxesnum): bbox_instance = c_box.boxes[i] bbox_dict = { "xmin": bbox_instance.xmin, "ymin": bbox_instance.ymin, "xmax": bbox_instance.xmax, "ymax": bbox_instance.ymax, "score": bbox_instance.score, "id": bbox_instance.id, "label": bbox_instance.label, } box_dict["boxes"].append(bbox_dict) json_str = json.dumps(box_dict, indent=2, ensure_ascii=False) print(model_name,box_dict['boxesnum'],[box["id"] for box in box_dict["boxes"]]) # print() # Serialize the dictionary to JSON # with open(output_path, 'a', encoding="utf-8") as f: # json.dump(box_dict, f, indent=2, ensure_ascii=False) # f.write('\n') def init_output_process(output_path): global stream_out_process # global stream_out_process # command = ['ffmpeg', # '-re', # # 'rtsp_transport', 'tcp', # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # '-s', "{}x{}".format(640, 360), # '-r', str(25), # '-i', '-', # '-c:v', 'libx264', # # '-pix_fmt', 'yuv420p', # '-preset', 'ultrafast', # '-tune', 'zerolatency', # '-b:v', '2000k', # 控制码率 # '-maxrate', '2000k', # '-bufsize', '4000k', # 设置缓冲区 # '-f', 'rtsp', # '-g', '10', # output_path] command = ['ffmpeg', # '-re', #"-f", "h264", '-i', '-', "-c:v", "copy", #-r', str(25), #'-preset', 'ultrafast', #'-tune', 'zerolatency', #'-b:v', '4000k', # 控制码率 #'-maxrate', '2000k', #'-bufsize', '4000k', # 设置缓冲区 '-f', 'rtsp', '-rtsp_transport', 'tcp', #'-g', '25', #'-max_delay','500000', #"-fflags", "+genpts", #"-use_wallclock_as_timestamps", "1", output_path] # 启动FFmpeg子进程 try: stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: print(f"Failed to start process: {e}") except subprocess.SubprocessError as e: print(f"Subprocess error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # stream_out_process = ( # ffmpeg # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25) # .output(output_path, vcodec='libx264', # preset='ultrafast', tune='zerolatency', # bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10) # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report']) # ) def read_stderr(process): for line in iter(process.stderr.readline, b''): print(f"stderr: {line.decode('utf-8')}", end='') if stream_out_process: stderr_thread = threading.Thread(target=read_stderr, args=(stream_out_process,)) stderr_thread.daemon = True stderr_thread.start() def push_video(video_frame, cancel_flag): # show window global stream_out_process global start_process_time frame_rate = 25 # 目标帧率 frame_interval = 1 / frame_rate # 每帧间隔时间 while True: try: frame = video_frame.get(False) except: if cancel_flag.value: sys.exit() else: continue if frame[1] or cancel_flag.value: # cv2.destroyAllWindows() sys.exit() if frame: image = frame[0] # resized_image = cv2.resize(image, (640, 360)) # stream_out_process.stdin.write(image.tobytes()) # start_time = time.time() stream_out_process.stdin.write(image.tobytes()) elapsed_time = time.time() - start_process_time time.sleep(max(0, frame_interval - elapsed_time)) class yolov5: def __init__(self, attr: infer_process_attr) -> None: self.attr = "" self.__ctx = "" self.__send_thread = "" self.__recv_thread = "" self.__ipe_thread = "" self.__demux_hdl = "" self.codec_para = "" self.__vdec_hdl = "" self.__vdec_attr = sdk.lyn_vdec_attr_t() self.__send_queue = block_queue() self.__ipe_queue = block_queue() self.__send_num = 0 self.__recv_num = 0 # 创建上下文环境 self.attr = copy.copy(attr) self.video_frame = attr.video_frame self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id) self.enc_head_flag = True common.error_check(ret, "create context") ret = sdk.lyn_register_error_handler(common.default_stream_error_handle) common.error_check(ret, 'lyn_register_error_handler') # 打开解封装器 self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) common.error_check(ret, "lyn_demux_open ") self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_codec_para ") # 打开解码器 self.__vdec_attr.codec_id = self.codec_para.codec_id self.__vdec_attr.output_fmt = self.attr.output_fmt self.__vdec_attr.scale = self.attr.scale self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr) common.error_check(ret, "lyn_vdec_open ") self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info( self.codec_para, self.__vdec_attr ) print(f'self.vdec_out_info.width = {self.vdec_out_info.width}') print(f'self.vdec_out_info.height = {self.vdec_out_info.height}') common.error_check(ret, "lyn_vdec_get_out_info ") # 创建stream、event和ipe desc self.send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") # self.apu_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") # self.plugin_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") self.venc_recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") # self.apu_event, ret = sdk.lyn_create_event() # common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc") self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc") self.ipe_config, ret = sdk.lyn_ipe_create_config_desc() common.error_check(ret, "lyn_ipe_create_config_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_reset_config_desc") # 获取模型信息 self.model_infos = [] self.model_nums = 0 for info in model_config: self.model_infos.append(ModelInfo(info['model_path'], info['plugin_path'], info['model_name'])) self.model_nums += 1 self.batch_size = self.model_infos[0].batch_size self.model_width = self.model_infos[0].model_width self.model_height = self.model_infos[0].model_height self.apu_output_size = self.model_infos[0].apu_output_size print(f'self.batch_size = {self.batch_size}') print(f'self.model_width = {self.model_width}') print(f'self.model_height = {self.model_height}') print(f'self.apu_output_size = {self.apu_output_size}') self.ipe_output_size = ( self.model_width * self.model_height * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3] ) ( self.resize_width, self.resize_height, self.pad_x, self.pad_y, ) = set_padding_data( self.vdec_out_info.width, self.vdec_out_info.height, self.model_width, self.model_height, ) self.attr.width = self.vdec_out_info.width self.attr.height = self.vdec_out_info.height # 创建对象池 self.ipe_output_mem_pool = bufferpool.buffer_pool( self.ipe_output_size * self.batch_size, 5 ) self.venc_recv_pool = bufferpool.buffer_pool( self.vdec_out_info.predict_buf_size, 5 ) print(f'self.ipe_output_size = {self.ipe_output_size}') print(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}') # 设置编码参数 self.venc_attr = sdk.lyn_venc_attr_t() ret = sdk.lyn_venc_set_default_params(self.venc_attr) common.error_check(ret, "lyn_venc_set_default_params") self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264 self.venc_attr.width = self.vdec_out_info.width self.venc_attr.height = self.vdec_out_info.height self.venc_attr.bit_depth = 8 self.venc_attr.bframes_num = 0 self.venc_attr.pframes_num = 5 self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12 self.venc_attr.target_bitrate = 6000000 self.venc_attr.level = -1 self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr) common.error_check(ret, "lyn_vdec_open") self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) def send_thread_func(self, cancel_flag): # 设置上下文环境 创建发送stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: # 从解封装器读取一个包 pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl) eos = pkt.eos if ret != 0 and not eos: sdk.lyn_demux_close(self.__demux_hdl) time.sleep(500.0 / 1000) print("demux failed, reconnecting...") self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) continue # 发送给解码器解码 ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt) common.error_check(ret, "lyn_vdec_send_packet_async") ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") # 释放packet内存并通知接收结果 if not eos: sdk.lyn_demux_free_packet(pkt) self.__send_num += 1 self.__send_queue.put(self.__send_num) else: self.__send_queue.put(-1) if cancel_flag.value: return def recv_thread_func(self, cancel_flag): # 设置上下文环境 创建接收stream sdk.lyn_set_current_context(self.__ctx) frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5) while self.__recv_num >= 0: self.__recv_num = self.__send_queue.take() cb_data = recv_cb_data() cb_data.frame.eos = self.__recv_num < 0 cb_data.frame.data = frame_pool.pop() cb_data.frame.size = self.vdec_out_info.predict_buf_size cb_data.frame_pool = frame_pool cb_data.send_num = self.__send_num cb_data.recv_num = self.__recv_num cb_data.attr = self.attr cb_data.block_queue = self.__ipe_queue cb_data.video_frame = self.video_frame # 插入接收指令,并添加接收完成回调函数 ret = sdk.lyn_vdec_recv_frame_async( self.recv_stream, self.__vdec_hdl, cb_data.frame ) common.error_check(ret, "lyn_vdec_recv_frame_async") ret = sdk.lyn_stream_add_async_callback( self.recv_stream, recv_frame_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") if cancel_flag.value: return def ipe_thread_func(self, cancel_flag): # 设置上下文环境 创建ipe stream global start_process_time sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: cb_data = self.__ipe_queue.take() eos = cb_data.frame.eos start_process_time = time.time() ipe_out_data = self.ipe_process(cb_data) self.model_process(ipe_out_data, cb_data) if cancel_flag.value: return def ipe_process(self, cb_data: framepool_cb_data): sdk.lyn_set_current_context(self.__ctx) ipe_out_data = self.ipe_output_mem_pool.pop() # 设置ipe输入 ret = sdk.lyn_ipe_set_input_pic_desc( self.ipe_input_desc, cb_data.frame.data, self.vdec_out_info.width, self.vdec_out_info.height, self.__vdec_attr.output_fmt, ) common.error_check(ret, "lyn_ipe_set_input_pic_desc") ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data) common.error_check(ret, "lyn_ipe_set_output_pic_data") ret = sdk.lyn_ipe_set_resize_config( self.ipe_config, self.resize_width, self.resize_height ) common.error_check(ret, "lyn_ipe_set_resize_config") ret = sdk.lyn_ipe_set_pad_config( self.ipe_config, self.pad_y, self.pad_x, self.pad_y, self.pad_x, 114, 114, 114, ) common.error_check(ret, "lyn_ipe_set_pad_config") ret = sdk.lyn_ipe_set_c2c_config( self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0 ) common.error_check(ret, "lyn_ipe_set_c2c_config") ret = sdk.lyn_ipe_cal_output_pic_desc( self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0 ) common.error_check(ret, "lyn_ipe_cal_output_pic_desc") ret = sdk.lyn_ipe_process_async( self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config ) common.error_check(ret, "lyn_ipe_process_async") return ipe_out_data def get_channel_info(self) -> str: return f'{self.attr.device_id}_{self.attr.chan_id}' def model_process(self, ipe_out_data, cb_data): for model_info in self.model_infos: apu_output_data = self.apu_process(model_info, ipe_out_data) self.plugin_process(model_info, apu_output_data, cb_data) # self.encode_process(model_info, cb_data) self.merge_plugin_process(cb_data) common.print_frame_rate(self.get_channel_info()) self.encode_process(cb_data) ret = sdk.lyn_stream_add_async_callback( self.model_infos[-1].apu_stream, free_to_pool_callback, [self.ipe_output_mem_pool, ipe_out_data], ) common.error_check(ret, "lyn_stream_add_async_callback") # ret = sdk.lyn_stream_add_async_callback( # self.model_infos[-1].plugin_stream, # free_to_pool_callback, # [cb_data.frame_pool, cb_data.frame.data], # ) # common.error_check(ret, "lyn_stream_add_async_callback") def apu_process(self, model_info, ipe_out_data): # 等待IPE处理完成 ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event) common.error_check(ret, "lyn_stream_wait_event") apu_output_data = model_info.apu_output_mem_pool.pop() ret = sdk.lyn_execute_model_async( model_info.apu_stream, model_info.model, ipe_out_data, apu_output_data, self.batch_size, ) common.error_check(ret, "lyn_execute_model_async") return apu_output_data def plugin_process(self,model_info, apu_output_data, cb_data): # 等待apu处理完成 ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event) common.error_check(ret, "lyn_stream_wait_event") format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) post_para = struct.pack( '6IH2f?2P', self.codec_para.width, self.codec_para.height, self.model_width, self.model_height, model_info.class_num, 500, # nmsTopK model_info.anchor_size, 0.25, # score threashold 0.45, # nms threshold True, # is pad resize apu_data_ptr, boxes_info_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para) ) common.error_check(ret, "lyn_plugin_run_async") # draw_para = struct.pack( # 'P2IiP', # boxes_info_ptr, # self.codec_para.width, # self.codec_para.height, # format, # frame_data_ptr, # ) # ret = sdk.lyn_plugin_run_async( # model_info.plugin_stream, # model_info.plugin, # "lynDrawBoxAndText", # draw_para, # len(draw_para), # ) # common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # model_info.plugin_stream, # show_boxinfo_cb, # [model_info.model_name, model_info.boxes_info], # ) ret = sdk.lyn_stream_add_async_callback( model_info.plugin_stream, free_to_pool_callback, [model_info.apu_output_mem_pool, apu_output_data], ) common.error_check(ret, "lyn_stream_add_async_callback") def merge_plugin_process(self, cb_data): plugin = self.model_infos[-1].plugin plugin_stream = self.model_infos[-1].plugin_stream pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) sources_ptrs = [ pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos ] while len(sources_ptrs) < 4: sources_ptrs.append(0) box_merge_params = struct.pack( "P4PH", # 格式:目标指针 + 4 个源指针 + 源数量 target_ptr, # target 的地址 *sources_ptrs, # sources 的地址列表 len(self.model_infos) # 源数量 ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params) ) common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # plugin_stream, # show_boxinfo_cb, # ['all', self.total_boxes_info], # ) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") def encode_process(self, cb_data: cb_data): # 在 plugin 处理完之后,进行编码操作 ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.enc_head_flag: # 第一帧编码 self.enc_head_flag = False enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.output_path = self.attr.output_path encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_venc_get_paramsset_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_get_paramsset_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_sendframe_async( self.venc_send_stream, self.venc_handle, cb_data.frame ) common.error_check(ret, "lyn_venc_sendframe_async") enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.eos = cb_data.frame.eos enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.output_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_stream_add_async_callback( self.venc_send_stream, free_to_pool_callback, [cb_data.frame_pool, cb_data.frame.data], ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_recvpacket_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_recvpacket_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") def run(self, cancel_flag): # 开启发送线程 self.__send_thread = threading.Thread( target=self.send_thread_func, args=(cancel_flag,) ) self.__send_thread.start() # 开启接收线程 self.__recv_thread = threading.Thread( target=self.recv_thread_func, args=(cancel_flag,) ) self.__recv_thread.start() # 开启图像处理和推理线程 self.__ipe_thread = threading.Thread( target=self.ipe_thread_func, args=(cancel_flag,) ) self.__ipe_thread.start() def close(self): if self.__send_thread.is_alive(): self.__send_thread.join() if self.__recv_thread.is_alive(): self.__recv_thread.join() if self.__ipe_thread.is_alive(): self.__ipe_thread.join() ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.ipe_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.apu_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.plugin_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_destroy_stream(self.send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.ipe_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.apu_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.plugin_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_event(self.ipe_event) common.error_check(ret, "lyn_destroy_event") ret = sdk.lyn_destroy_event(self.apu_event) common.error_check(ret, "lyn_destroy_event") # destory ipe desc and config ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_destroy_config_desc") ret = sdk.lyn_plugin_unregister(self.plugin) common.error_check(ret, "lyn_plugin_unregister") # 卸载模型 for model_info in self.model_infos: ret = sdk.lyn_unload_model(model_info.model) common.error_check(ret, "lyn_unload_model") if self.__vdec_hdl != "": ret = sdk.lyn_vdec_close(self.__vdec_hdl) common.error_check(ret, "lyn_vdec_close") if self.__ctx != "": ret = sdk.lyn_destroy_context(self.__ctx) common.error_check(ret, "lyn_destroy_context") def main(args, video_frame, device_id: int, channel_id: int) -> None: attr = infer_process_attr() attr.url = args.input_path # attr.output_path = args.output_path attr.device_id = device_id attr.chan_id = channel_id attr.plugin_path = args.plugin_path attr.model_path = args.model_path attr.video_frame = video_frame if not attr.url: raise ValueError('input file miss!!!') # if not attr.output_path: # raise ValueError('unspecified output path!!!') decoder = yolov5(attr) decoder.run(cancel_flag) decoder.close() if __name__ == "__main__": signal.signal(signal.SIGINT, cancel_process) parser = argparse.ArgumentParser(description="RTSP stream input and output paths") parser.add_argument('--input_path', type=str, required=True, help='Input path (e.g., RTSP or file path)') parser.add_argument('--output_path', type=str, required=True, help='Output path (e.g., RTSP output path)') args = parser.parse_args() # args = SimpleNamespace() # args.input_path = 'rtsp://192.168.10.137:554/11' # # args.input_path = '/usr/local/lynxi/sdk/sdk-samples/data/MOT16-09.mp4' # args.output_path = 'rtsp://192.168.1.100:8554/live?rtptransport=udp' args.plugin_path = 'plugins/libYolov5Plugin.so' args.model_path = 'models/yolov5s/Net_0' init_output_process(args.output_path) device_id = 0 channel_id = 1 video_frame = queue.Queue(10) channel_thread = threading.Thread(target=main, args=(args, video_frame, device_id, channel_id)) channel_thread.start() push_thread = threading.Thread(target=push_video, args=(video_frame, cancel_flag)) push_thread.start()