from blockqueue import block_queue from callback_data_struct import * import common import bufferpool import math import queue import sys, os from types import SimpleNamespace from typing import List, Tuple import time import numpy as np import pylynchipsdk as sdk import copy import threading import multiprocessing import signal import sys import json import ctypes from ctypes import * import struct import subprocess import argparse # import ffmpeg try: import cv2 except ImportError: cv2 = None window_created = False grid = None stream_out_process = None def cancel_process(signum, frame): global cancel_flag cancel_flag.value = True cancel_flag = multiprocessing.Value('b', False) class bbox(ctypes.Structure): _fields_ = [ ("xmin", ctypes.c_uint32), ("ymin", ctypes.c_uint32), ("xmax", ctypes.c_uint32), ("ymax", ctypes.c_uint32), ("score", ctypes.c_float), ("label", ctypes.c_wchar * 64), ] class Box(ctypes.Structure): _fields_ = [("boxesnum", ctypes.c_uint32), ("boxes", ctypes.ARRAY(bbox, 256))] def recv_frame_cb(cb_data: cb_data): # 放到ipe的处理队列中 cb_data.block_queue.put(cb_data) return 0 def free_to_pool_callback(params): vdec_out_pool = params[0] data = params[1] vdec_out_pool.push(data) return 0 def set_even(num): num = math.floor(num) if num % 2 != 0: res = num - 1 else: res = num return res def set_padding_data(vidoe_width, video_height, model_width, model_height): if vidoe_width > video_height: resize_width = model_width pad_x = 0 resize_height = set_even(int(video_height * model_width / vidoe_width)) pad_y = (model_height - resize_height) / 2 else: resize_height = model_height pad_y = 0 resize_width = set_even(int(vidoe_width * model_width / video_height)) pad_x = (model_width - resize_width) / 2 return int(resize_width), int(resize_height), int(pad_x), int(pad_y) def show_video_cb(cb_data: cb_data): data = np.empty(cb_data.frame.size, dtype=np.uint8) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, cb_data.frame.data, cb_data.frame.size, sdk.lyn_memcpy_dir_t.ServerToClient, ) common.error_check(ret, "lyn_memcpy") yuvImg = np.reshape( data, (cb_data.attr.height * 3 // 2, cb_data.attr.width) ).astype(np.uint8) import cv2 rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12) if cb_data.frame.eos: cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) else: cb_data.attr.video_frame.queue.clear() cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) cb_data.frame_pool.push(cb_data.frame.data) return 0 def save_file_cb(cb_data: save_file_cb_data): packet = cb_data.packet data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet) common.error_check(ret, "lyn_enc_get_remote_packet_valid_size") data = np.zeros(data_size, dtype=np.byte) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "lyn_memcpy") cb_data.video_frame.put([data, packet.eos]) cb_data.recv_pool.push(packet.data) # with open(cb_data.output_path, "ab") as f: # f.write(data.tobytes()) return ret def init_output_process(output_path): global stream_out_process global stream_out_process # command = ['ffmpeg', # '-re', # # 'rtsp_transport', 'tcp', # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # '-s', "{}x{}".format(640, 360), # '-r', str(25), # '-i', '-', # '-c:v', 'libx264', # # '-pix_fmt', 'yuv420p', # '-preset', 'ultrafast', # '-tune', 'zerolatency', # '-b:v', '2000k', # 控制码率 # '-maxrate', '2000k', # '-bufsize', '4000k', # 设置缓冲区 # '-f', 'rtsp', # '-g', '10', # output_path] command = ['ffmpeg', '-re', "-f", "h264", '-i', '-', "-c:v", "copy", '-r', str(25), '-preset', 'ultrafast', '-tune', 'zerolatency', '-b:v', '2000k', # 控制码率 '-maxrate', '2000k', '-bufsize', '4000k', # 设置缓冲区 '-f', 'rtsp', '-rtsp_transport', 'tcp', '-g', '10', output_path] # 启动FFmpeg子进程 try: stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: print(f"Failed to start process: {e}") except subprocess.SubprocessError as e: print(f"Subprocess error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # stream_out_process = ( # ffmpeg # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25) # .output(output_path, vcodec='libx264', # preset='ultrafast', tune='zerolatency', # bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10) # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report']) # ) def read_stderr(process): for line in iter(process.stderr.readline, b''): print(f"stderr: {line.decode('utf-8')}", end='') if stream_out_process: stderr_thread = threading.Thread(target=read_stderr, args=(stream_out_process,)) stderr_thread.daemon = True stderr_thread.start() def push_video(video_frame, cancel_flag): # show window global stream_out_process while True: try: frame = video_frame.get(False) except: if cancel_flag.value: sys.exit() else: continue if frame[1] or cancel_flag.value: # cv2.destroyAllWindows() sys.exit() image = frame[0] # resized_image = cv2.resize(image, (640, 360)) stream_out_process.stdin.write(image.tobytes()) class yolov5: def __init__(self, attr: infer_process_attr) -> None: self.attr = "" self.__ctx = "" self.__send_thread = "" self.__recv_thread = "" self.__ipe_thread = "" self.__demux_hdl = "" self.codec_para = "" self.__vdec_hdl = "" self.__vdec_attr = sdk.lyn_vdec_attr_t() self.__send_queue = block_queue() self.__ipe_queue = block_queue() self.__send_num = 0 self.__recv_num = 0 # 创建上下文环境 self.attr = copy.copy(attr) self.video_frame = attr.video_frame self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id) self.enc_head_flag = True common.error_check(ret, "create context") ret = sdk.lyn_register_error_handler(common.default_stream_error_handle) common.error_check(ret, 'lyn_register_error_handler') if attr.output_path and os.path.exists(attr.output_path): os.remove(attr.output_path) # 打开解封装器 self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) common.error_check(ret, "lyn_demux_open ") self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_codec_para ") # 打开解码器 self.__vdec_attr.codec_id = self.codec_para.codec_id self.__vdec_attr.output_fmt = self.attr.output_fmt self.__vdec_attr.scale = self.attr.scale self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr) common.error_check(ret, "lyn_vdec_open ") self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info( self.codec_para, self.__vdec_attr ) print(f'self.vdec_out_info.width = {self.vdec_out_info.width}') print(f'self.vdec_out_info.height = {self.vdec_out_info.height}') common.error_check(ret, "lyn_vdec_get_out_info ") # 注册plugin if self.attr.plugin_path: self.plugin, ret = sdk.lyn_plugin_register(self.attr.plugin_path) common.error_check(ret, "lyn_plugin_register") # 打开模型 self.__model, ret = sdk.lyn_load_model(self.attr.model_path) common.error_check(ret, "lyn_load_model") # 创建stream、event和ipe desc self.send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.apu_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.plugin_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.apu_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc") self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc") self.ipe_config, ret = sdk.lyn_ipe_create_config_desc() common.error_check(ret, "lyn_ipe_create_config_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_reset_config_desc") # 获取模型信息 self.model_desc, ret = sdk.lyn_model_get_desc(self.__model) common.error_check(ret, "lyn_model_get_desc") self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2] self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1] print(f'self.batch_size = {self.batch_size}') print(f'self.model_width = {self.model_width}') print(f'self.model_height = {self.model_height}') self.ipe_output_size = ( self.model_width * self.model_height * self.model_desc.inputTensorAttrArray[0].dims[3] ) self.apu_output_size = self.model_desc.outputDataLen ( self.resize_width, self.resize_height, self.pad_x, self.pad_y, ) = set_padding_data( self.vdec_out_info.width, self.vdec_out_info.height, self.model_width, self.model_height, ) self.attr.width = self.vdec_out_info.width self.attr.height = self.vdec_out_info.height # 创建对象池 self.ipe_output_mem_pool = bufferpool.buffer_pool( self.ipe_output_size * self.batch_size, 5 ) self.apu_output_mem_pool = bufferpool.buffer_pool( self.apu_output_size * self.batch_size, 5 ) self.venc_recv_pool = bufferpool.buffer_pool( self.vdec_out_info.predict_buf_size, 5 ) print(f'self.ipe_output_size = {self.ipe_output_size}') print(f'self.apu_output_size = {self.apu_output_size}') print(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}') # 设置编码参数 self.venc_attr = sdk.lyn_venc_attr_t() ret = sdk.lyn_venc_set_default_params(self.venc_attr) common.error_check(ret, "lyn_venc_set_default_params") self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264 self.venc_attr.width = self.vdec_out_info.width self.venc_attr.height = self.vdec_out_info.height self.venc_attr.bit_depth = 8 self.venc_attr.bframes_num = 0 self.venc_attr.pframes_num = 5 self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12 self.venc_attr.target_bitrate = 6000000 self.venc_attr.level = -1 self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr) common.error_check(ret, "lyn_vdec_open") # yolov5 self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5 self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1] self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) def send_thread_func(self, cancel_flag): # 设置上下文环境 创建发送stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: # 从解封装器读取一个包 pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl) eos = pkt.eos if ret != 0 and not eos: sdk.lyn_demux_close(self.__demux_hdl) time.sleep(500.0 / 1000) print("demux failed, reconnecting...") self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) continue # 发送给解码器解码 ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt) common.error_check(ret, "lyn_vdec_send_packet_async") ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") # 释放packet内存并通知接收结果 if not eos: sdk.lyn_demux_free_packet(pkt) self.__send_num += 1 self.__send_queue.put(self.__send_num) else: self.__send_queue.put(-1) if cancel_flag.value: return def recv_thread_func(self, cancel_flag): # 设置上下文环境 创建接收stream sdk.lyn_set_current_context(self.__ctx) frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5) while self.__recv_num >= 0: self.__recv_num = self.__send_queue.take() cb_data = recv_cb_data() cb_data.frame.eos = self.__recv_num < 0 cb_data.frame.data = frame_pool.pop() cb_data.frame.size = self.vdec_out_info.predict_buf_size cb_data.frame_pool = frame_pool cb_data.send_num = self.__send_num cb_data.recv_num = self.__recv_num cb_data.attr = self.attr cb_data.block_queue = self.__ipe_queue cb_data.video_frame = self.video_frame # 插入接收指令,并添加接收完成回调函数 ret = sdk.lyn_vdec_recv_frame_async( self.recv_stream, self.__vdec_hdl, cb_data.frame ) common.error_check(ret, "lyn_vdec_recv_frame_async") ret = sdk.lyn_stream_add_async_callback( self.recv_stream, recv_frame_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") if cancel_flag.value: return def ipe_thread_func(self, cancel_flag): # 设置上下文环境 创建ipe stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: cb_data = self.__ipe_queue.take() eos = cb_data.frame.eos ipe_out_data = self.ipe_process(cb_data) apu_output_data = self.apu_process(ipe_out_data) if self.attr.plugin_path: self.plugin_process(apu_output_data, cb_data) self.encode_process(cb_data) # ret = sdk.lyn_stream_add_async_callback( # self.plugin_stream, show_video_cb, cb_data # ) # common.error_check(ret, "lyn_stream_add_async_callback") if cancel_flag.value: return def ipe_process(self, cb_data: framepool_cb_data): sdk.lyn_set_current_context(self.__ctx) ipe_out_data = self.ipe_output_mem_pool.pop() # 设置ipe输入 ret = sdk.lyn_ipe_set_input_pic_desc( self.ipe_input_desc, cb_data.frame.data, self.vdec_out_info.width, self.vdec_out_info.height, self.__vdec_attr.output_fmt, ) common.error_check(ret, "lyn_ipe_set_input_pic_desc") ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data) common.error_check(ret, "lyn_ipe_set_output_pic_data") ret = sdk.lyn_ipe_set_resize_config( self.ipe_config, self.resize_width, self.resize_height ) common.error_check(ret, "lyn_ipe_set_resize_config") ret = sdk.lyn_ipe_set_pad_config( self.ipe_config, self.pad_y, self.pad_x, self.pad_y, self.pad_x, 114, 114, 114, ) common.error_check(ret, "lyn_ipe_set_pad_config") ret = sdk.lyn_ipe_set_c2c_config( self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0 ) common.error_check(ret, "lyn_ipe_set_c2c_config") ret = sdk.lyn_ipe_cal_output_pic_desc( self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0 ) common.error_check(ret, "lyn_ipe_cal_output_pic_desc") ret = sdk.lyn_ipe_process_async( self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config ) common.error_check(ret, "lyn_ipe_process_async") return ipe_out_data def get_channel_info(self) -> str: return f'{self.attr.device_id}_{self.attr.chan_id}' def apu_process(self, ipe_out_data): # 等待IPE处理完成 ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.apu_stream, self.ipe_event) common.error_check(ret, "lyn_stream_wait_event") apu_output_data = self.apu_output_mem_pool.pop() ret = sdk.lyn_execute_model_async( self.apu_stream, self.__model, ipe_out_data, apu_output_data, self.batch_size, ) common.error_check(ret, "lyn_execute_model_async") ret = sdk.lyn_stream_add_async_callback( self.apu_stream, free_to_pool_callback, [self.ipe_output_mem_pool, ipe_out_data], ) common.error_check(ret, "lyn_stream_add_async_callback") common.print_frame_rate(self.get_channel_info()) return apu_output_data def plugin_process(self, apu_output_data, cb_data): # 等待apu处理完成 ret = sdk.lyn_record_event(self.apu_stream, self.apu_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.plugin_stream, self.apu_event) common.error_check(ret, "lyn_stream_wait_event") format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.boxes_info, None) post_para = struct.pack( '6IH2f?2P', self.codec_para.width, self.codec_para.height, self.model_width, self.model_height, self.class_num, 500, # nmsTopK self.anchor_size, 0.25, # score threashold 0.45, # nms threshold True, # is pad resize apu_data_ptr, boxes_info_ptr, ) ret = sdk.lyn_plugin_run_async( self.plugin_stream, self.plugin, "lynPostProcess", post_para, len(post_para) ) common.error_check(ret, "lyn_plugin_run_async") draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( self.plugin_stream, self.plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") ret = sdk.lyn_stream_add_async_callback( self.plugin_stream, free_to_pool_callback, [self.apu_output_mem_pool, apu_output_data], ) common.error_check(ret, "lyn_stream_add_async_callback") def encode_process(self, cb_data: cb_data): # 在 plugin 处理完之后,进行编码操作 ret = sdk.lyn_record_event(self.plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.enc_head_flag: # 第一帧编码 self.enc_head_flag = False enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.output_path = self.attr.output_path encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_venc_get_paramsset_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_get_paramsset_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_sendframe_async( self.venc_send_stream, self.venc_handle, cb_data.frame ) common.error_check(ret, "lyn_venc_sendframe_async") enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.eos = cb_data.frame.eos enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.output_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_stream_add_async_callback( self.venc_send_stream, free_to_pool_callback, [cb_data.frame_pool, cb_data.frame.data], ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_recvpacket_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_recvpacket_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") def app_process(self, cb_data: cb_data): ret = sdk.lyn_record_event(self.plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.app_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.attr.show_type == 0: ret = sdk.lyn_stream_add_async_callback( self.plugin_stream, show_video_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") elif self.attr.show_type == 1: self.encode_process(cb_data) def run(self, cancel_flag): # 开启发送线程 self.__send_thread = threading.Thread( target=self.send_thread_func, args=(cancel_flag,) ) self.__send_thread.start() # 开启接收线程 self.__recv_thread = threading.Thread( target=self.recv_thread_func, args=(cancel_flag,) ) self.__recv_thread.start() # 开启图像处理和推理线程 self.__ipe_thread = threading.Thread( target=self.ipe_thread_func, args=(cancel_flag,) ) self.__ipe_thread.start() def close(self): if self.__send_thread.is_alive(): self.__send_thread.join() if self.__recv_thread.is_alive(): self.__recv_thread.join() if self.__ipe_thread.is_alive(): self.__ipe_thread.join() ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.ipe_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.apu_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.plugin_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_destroy_stream(self.send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.ipe_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.apu_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.plugin_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_event(self.ipe_event) common.error_check(ret, "lyn_destroy_event") ret = sdk.lyn_destroy_event(self.apu_event) common.error_check(ret, "lyn_destroy_event") # destory ipe desc and config ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_destroy_config_desc") ret = sdk.lyn_plugin_unregister(self.plugin) common.error_check(ret, "lyn_plugin_unregister") # 卸载模型 ret = sdk.lyn_unload_model(self.__model) common.error_check(ret, "lyn_unload_model") if self.__vdec_hdl != "": ret = sdk.lyn_vdec_close(self.__vdec_hdl) common.error_check(ret, "lyn_vdec_close") if self.__ctx != "": ret = sdk.lyn_destroy_context(self.__ctx) common.error_check(ret, "lyn_destroy_context") def main(args, video_frame, device_id: int, channel_id: int) -> None: attr = infer_process_attr() attr.url = args.input_path attr.output_path = args.output_path attr.device_id = device_id attr.chan_id = channel_id attr.plugin_path = args.plugin_path attr.model_path = args.model_path attr.video_frame = video_frame if not attr.url: raise ValueError('input file miss!!!') if not attr.output_path: raise ValueError('unspecified output path!!!') decoder = yolov5(attr) decoder.run(cancel_flag) decoder.close() if __name__ == "__main__": signal.signal(signal.SIGINT, cancel_process) parser = argparse.ArgumentParser(description="RTSP stream input and output paths") parser.add_argument('--input_path', type=str, required=True, help='Input path (e.g., RTSP or file path)') parser.add_argument('--output_path', type=str, required=True, help='Output path (e.g., RTSP output path)') args = parser.parse_args() # args = SimpleNamespace() # args.input_path = 'rtsp://192.168.10.137:554/11' # # args.input_path = '/usr/local/lynxi/sdk/sdk-samples/data/MOT16-09.mp4' # args.output_path = 'rtsp://192.168.1.100:8554/live?rtptransport=udp' # args.plugin_path = 'plugins/libYolov5Plugin.so' # args.model_path = 'models/yolov5s/Net_0' args.plugin_path = 'plugins/yuntai_3.so' args.model_path = 'models/yuntai/Net_0' init_output_process(args.output_path) device_id = 0 channel_id = 1 video_frame = queue.Queue(10) channel_thread = threading.Thread(target=main, args=(args, video_frame, device_id, channel_id)) channel_thread.start() push_video(video_frame, cancel_flag)