from blockqueue import block_queue from callback_data_struct import * import common import bufferpool import math import queue import sys, os from types import SimpleNamespace from typing import List, Tuple import time import numpy as np import pylynchipsdk as sdk import copy import threading import multiprocessing import signal import sys import json import ctypes from ctypes import * import struct import subprocess import argparse # import ffmpeg try: import cv2 except ImportError: cv2 = None def cancel_process(signum, frame): global cancel_flag cancel_flag.value = True cancel_flag = multiprocessing.Value('b', False) class bbox(ctypes.Structure): _fields_ = [ ("xmin", ctypes.c_uint32), ("ymin", ctypes.c_uint32), ("xmax", ctypes.c_uint32), ("ymax", ctypes.c_uint32), ("score", ctypes.c_float), ("id", ctypes.c_uint32), ("label", ctypes.c_wchar * 64), ] class Box(ctypes.Structure): _fields_ = [("boxesnum", ctypes.c_uint32), ("boxes", ctypes.ARRAY(bbox, 256))] class ModelInfo: def __init__(self, model_path, plugin_path, model_name = '') -> None: self.model_path = model_path self.plugin_path = plugin_path self.model_name = model_name print(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}') self.model, ret = sdk.lyn_load_model(self.model_path) common.error_check(ret, "lyn_load_model") self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path) common.error_check(ret, "lyn_plugin_register") self.apu_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.plugin_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.apu_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.model_desc, ret = sdk.lyn_model_get_desc(self.model) common.error_check(ret, "lyn_model_get_desc") self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2] self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1] self.apu_output_size = self.model_desc.outputDataLen self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5 self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1] self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.apu_output_mem_pool = bufferpool.buffer_pool( self.apu_output_size * self.batch_size, 5 ) print(f'batch_size = {self.batch_size}') print(f'model_width = {self.model_width}') print(f'model_height = {self.model_height}') print(f'apu_output_size = {self.apu_output_size}') print(f'class_num = {self.class_num}') print(f'anchor_size = {self.anchor_size}') def recv_frame_cb(cb_data: cb_data): # 放到ipe的处理队列中 cb_data.block_queue.put(cb_data) return 0 def free_to_pool_callback(params): vdec_out_pool = params[0] data = params[1] vdec_out_pool.push(data) return 0 def set_even(num): num = math.floor(num) if num % 2 != 0: res = num - 1 else: res = num return res def set_padding_data(vidoe_width, video_height, model_width, model_height): if vidoe_width > video_height: resize_width = model_width pad_x = 0 resize_height = set_even(int(video_height * model_width / vidoe_width)) pad_y = (model_height - resize_height) / 2 else: resize_height = model_height pad_y = 0 resize_width = set_even(int(vidoe_width * model_width / video_height)) pad_x = (model_width - resize_width) / 2 return int(resize_width), int(resize_height), int(pad_x), int(pad_y) def show_video_cb(cb_data: cb_data): data = np.empty(cb_data.frame.size, dtype=np.uint8) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, cb_data.frame.data, cb_data.frame.size, sdk.lyn_memcpy_dir_t.ServerToClient, ) common.error_check(ret, "lyn_memcpy") yuvImg = np.reshape( data, (cb_data.attr.height * 3 // 2, cb_data.attr.width) ).astype(np.uint8) import cv2 rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12) if cb_data.frame.eos: cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) else: cb_data.attr.video_frame.queue.clear() cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos]) cb_data.frame_pool.push(cb_data.frame.data) return 0 def save_file_cb(cb_data: save_file_cb_data): packet = cb_data.packet data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet) common.error_check(ret, "lyn_enc_get_remote_packet_valid_size") data = np.zeros(data_size, dtype=np.byte) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "lyn_memcpy") cb_data.video_frame.put([data, packet.eos]) cb_data.recv_pool.push(packet.data) # with open(cb_data.output_path, "ab") as f: # f.write(data.tobytes()) return ret def show_boxinfo_cb(params): model_name = params[0] boxes_info = params[1] dst_img_size = ctypes.sizeof(Box) host_buf_arr = np.ones(dst_img_size, dtype=np.uint8) host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr) ret = sdk.lyn_memcpy( host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None) c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents dump_box_json(model_name, c_box) return 0 def dump_box_json(model_name, c_box): box_dict = {"boxesnum": c_box.boxesnum, "boxes": []} for i in range(c_box.boxesnum): bbox_instance = c_box.boxes[i] bbox_dict = { "xmin": bbox_instance.xmin, "ymin": bbox_instance.ymin, "xmax": bbox_instance.xmax, "ymax": bbox_instance.ymax, "score": bbox_instance.score, "id": bbox_instance.id, "label": bbox_instance.label, } box_dict["boxes"].append(bbox_dict) json_str = json.dumps(box_dict, indent=2, ensure_ascii=False) print(model_name,box_dict['boxesnum'],[box["id"] for box in box_dict["boxes"]]) # print() # Serialize the dictionary to JSON # with open(output_path, 'a', encoding="utf-8") as f: # json.dump(box_dict, f, indent=2, ensure_ascii=False) # f.write('\n') class yolov5: def __init__(self, attr: infer_process_attr) -> None: self.attr = "" self.__ctx = "" self.__send_thread = "" self.__recv_thread = "" self.__ipe_thread = "" self.__demux_hdl = "" self.codec_para = "" self.__vdec_hdl = "" self.__vdec_attr = sdk.lyn_vdec_attr_t() self.__send_queue = block_queue() self.__ipe_queue = block_queue() self.__send_num = 0 self.__recv_num = 0 # 创建上下文环境 self.attr = copy.copy(attr) self.video_frame = attr.video_frame self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id) self.enc_head_flag = True common.error_check(ret, "create context") ret = sdk.lyn_register_error_handler(common.default_stream_error_handle) common.error_check(ret, 'lyn_register_error_handler') # 打开解封装器 self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) common.error_check(ret, "lyn_demux_open ") self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_codec_para ") # 打开解码器 self.__vdec_attr.codec_id = self.codec_para.codec_id self.__vdec_attr.output_fmt = self.attr.output_fmt self.__vdec_attr.scale = self.attr.scale self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr) common.error_check(ret, "lyn_vdec_open ") self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info( self.codec_para, self.__vdec_attr ) print(f'self.vdec_out_info.width = {self.vdec_out_info.width}') print(f'self.vdec_out_info.height = {self.vdec_out_info.height}') common.error_check(ret, "lyn_vdec_get_out_info ") # 创建stream、event和ipe desc self.send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") # self.apu_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") # self.plugin_stream, ret = sdk.lyn_create_stream() # common.error_check(ret, "lyn_create_stream") self.venc_recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") # self.apu_event, ret = sdk.lyn_create_event() # common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc") self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc") self.ipe_config, ret = sdk.lyn_ipe_create_config_desc() common.error_check(ret, "lyn_ipe_create_config_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_reset_config_desc") # 获取模型信息 self.model_infos = [] self.model_nums = 0 for info in self.attr.model_configs: self.model_infos.append(ModelInfo(info['model_path'], info['plugin_path'], info['model_name'])) self.model_nums += 1 self.batch_size = self.model_infos[0].batch_size self.model_width = self.model_infos[0].model_width self.model_height = self.model_infos[0].model_height self.apu_output_size = self.model_infos[0].apu_output_size print(f'self.batch_size = {self.batch_size}') print(f'self.model_width = {self.model_width}') print(f'self.model_height = {self.model_height}') print(f'self.apu_output_size = {self.apu_output_size}') self.ipe_output_size = ( self.model_width * self.model_height * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3] ) ( self.resize_width, self.resize_height, self.pad_x, self.pad_y, ) = set_padding_data( self.vdec_out_info.width, self.vdec_out_info.height, self.model_width, self.model_height, ) self.attr.width = self.vdec_out_info.width self.attr.height = self.vdec_out_info.height # 创建对象池 self.ipe_output_mem_pool = bufferpool.buffer_pool( self.ipe_output_size * self.batch_size, 5 ) self.venc_recv_pool = bufferpool.buffer_pool( self.vdec_out_info.predict_buf_size, 5 ) print(f'self.ipe_output_size = {self.ipe_output_size}') print(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}') # 设置编码参数 self.venc_attr = sdk.lyn_venc_attr_t() ret = sdk.lyn_venc_set_default_params(self.venc_attr) common.error_check(ret, "lyn_venc_set_default_params") self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264 self.venc_attr.width = self.vdec_out_info.width self.venc_attr.height = self.vdec_out_info.height self.venc_attr.bit_depth = 8 self.venc_attr.bframes_num = 0 self.venc_attr.pframes_num = 5 self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12 self.venc_attr.target_bitrate = 6000000 self.venc_attr.level = -1 self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr) common.error_check(ret, "lyn_vdec_open") self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.start_process_time = None self.stream_out_process = None self.init_output_process() def init_output_process(self): # command = ['ffmpeg', # '-re', # # 'rtsp_transport', 'tcp', # '-f', 'rawvideo', # '-vcodec', 'rawvideo', # '-pix_fmt', 'bgr24', # '-s', "{}x{}".format(640, 360), # '-r', str(25), # '-i', '-', # '-c:v', 'libx264', # # '-pix_fmt', 'yuv420p', # '-preset', 'ultrafast', # '-tune', 'zerolatency', # '-b:v', '2000k', # 控制码率 # '-maxrate', '2000k', # '-bufsize', '4000k', # 设置缓冲区 # '-f', 'rtsp', # '-g', '10', # output_path] command = ['ffmpeg', # '-re', # "-f", "h264", '-i', '-', "-c:v", "copy", # '-r', str(25), # '-preset', 'ultrafast', # '-tune', 'zerolatency', # '-b:v', '2000k', # 控制码率 # '-maxrate', '2000k', # '-bufsize', '4000k', # 设置缓冲区 '-f', 'rtsp', '-rtsp_transport', 'tcp', # '-g', '10', self.attr.output_path] # 启动FFmpeg子进程 try: self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: print(f"Failed to start process: {e}") except subprocess.SubprocessError as e: print(f"Subprocess error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # stream_out_process = ( # ffmpeg # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25) # .output(output_path, vcodec='libx264', # preset='ultrafast', tune='zerolatency', # bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10) # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report']) # ) def read_stderr(process): for line in iter(process.stderr.readline, b''): print(f"stderr: {line.decode('utf-8')}", end='') if self.stream_out_process: stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,)) stderr_thread.daemon = True stderr_thread.start() def push_video(self): # show window global cancel_flag frame_rate = 25 # 目标帧率 frame_interval = 1 / frame_rate # 每帧间隔时间 while True: try: frame = self.video_frame.get(False) except: if cancel_flag.value: sys.exit() else: continue if frame[1] or cancel_flag.value: # cv2.destroyAllWindows() sys.exit() if frame and self.stream_out_process: image = frame[0] # resized_image = cv2.resize(image, (640, 360)) self.stream_out_process.stdin.write(image.tobytes()) elapsed_time = time.time() - self.start_process_time # print(f'{elapsed_time*1000}ms') time.sleep(max(0, frame_interval - elapsed_time)) def send_thread_func(self, cancel_flag): # 设置上下文环境 创建发送stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: # 从解封装器读取一个包 pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl) eos = pkt.eos if ret != 0 and not eos: sdk.lyn_demux_close(self.__demux_hdl) time.sleep(500.0 / 1000) print("demux failed, reconnecting...") self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) continue # 发送给解码器解码 ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt) common.error_check(ret, "lyn_vdec_send_packet_async") ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") # 释放packet内存并通知接收结果 if not eos: sdk.lyn_demux_free_packet(pkt) self.__send_num += 1 self.__send_queue.put(self.__send_num) else: self.__send_queue.put(-1) if cancel_flag.value: return def recv_thread_func(self, cancel_flag): # 设置上下文环境 创建接收stream sdk.lyn_set_current_context(self.__ctx) frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5) while self.__recv_num >= 0: self.__recv_num = self.__send_queue.take() cb_data = recv_cb_data() cb_data.frame.eos = self.__recv_num < 0 cb_data.frame.data = frame_pool.pop() cb_data.frame.size = self.vdec_out_info.predict_buf_size cb_data.frame_pool = frame_pool cb_data.send_num = self.__send_num cb_data.recv_num = self.__recv_num cb_data.attr = self.attr cb_data.block_queue = self.__ipe_queue cb_data.video_frame = self.video_frame # 插入接收指令,并添加接收完成回调函数 ret = sdk.lyn_vdec_recv_frame_async( self.recv_stream, self.__vdec_hdl, cb_data.frame ) common.error_check(ret, "lyn_vdec_recv_frame_async") ret = sdk.lyn_stream_add_async_callback( self.recv_stream, recv_frame_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") if cancel_flag.value: return def ipe_thread_func(self, cancel_flag): # 设置上下文环境 创建ipe stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: cb_data = self.__ipe_queue.take() eos = cb_data.frame.eos self.start_process_time = time.time() ipe_out_data = self.ipe_process(cb_data) self.model_process(ipe_out_data, cb_data) if cancel_flag.value: return def ipe_process(self, cb_data: framepool_cb_data): sdk.lyn_set_current_context(self.__ctx) ipe_out_data = self.ipe_output_mem_pool.pop() # 设置ipe输入 ret = sdk.lyn_ipe_set_input_pic_desc( self.ipe_input_desc, cb_data.frame.data, self.vdec_out_info.width, self.vdec_out_info.height, self.__vdec_attr.output_fmt, ) common.error_check(ret, "lyn_ipe_set_input_pic_desc") ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data) common.error_check(ret, "lyn_ipe_set_output_pic_data") ret = sdk.lyn_ipe_set_resize_config( self.ipe_config, self.resize_width, self.resize_height ) common.error_check(ret, "lyn_ipe_set_resize_config") ret = sdk.lyn_ipe_set_pad_config( self.ipe_config, self.pad_y, self.pad_x, self.pad_y, self.pad_x, 114, 114, 114, ) common.error_check(ret, "lyn_ipe_set_pad_config") ret = sdk.lyn_ipe_set_c2c_config( self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0 ) common.error_check(ret, "lyn_ipe_set_c2c_config") ret = sdk.lyn_ipe_cal_output_pic_desc( self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0 ) common.error_check(ret, "lyn_ipe_cal_output_pic_desc") ret = sdk.lyn_ipe_process_async( self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config ) common.error_check(ret, "lyn_ipe_process_async") # ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) # common.error_check(ret, "lyn_record_event") return ipe_out_data def get_channel_info(self) -> str: return f'{self.attr.device_id}_{self.attr.output_path}' def model_process(self, ipe_out_data, cb_data): for model_info in self.model_infos: apu_output_data = self.apu_process(model_info, ipe_out_data) self.plugin_process(model_info, apu_output_data, cb_data) self.merge_plugin_process(cb_data) self.encode_process(cb_data) common.print_frame_rate(self.get_channel_info()) ret = sdk.lyn_stream_add_async_callback( self.model_infos[-1].apu_stream, free_to_pool_callback, [self.ipe_output_mem_pool, ipe_out_data], ) common.error_check(ret, "lyn_stream_add_async_callback") # ret = sdk.lyn_stream_add_async_callback( # self.model_infos[-1].plugin_stream, # free_to_pool_callback, # [cb_data.frame_pool, cb_data.frame.data], # ) # common.error_check(ret, "lyn_stream_add_async_callback") def apu_process(self, model_info, ipe_out_data): # 等待IPE处理完成 ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event) common.error_check(ret, "lyn_stream_wait_event") apu_output_data = model_info.apu_output_mem_pool.pop() ret = sdk.lyn_execute_model_async( model_info.apu_stream, model_info.model, ipe_out_data, apu_output_data, self.batch_size, ) common.error_check(ret, "lyn_execute_model_async") return apu_output_data def plugin_process(self,model_info, apu_output_data, cb_data): # 等待apu处理完成 ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event) common.error_check(ret, "lyn_stream_wait_event") format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) post_para = struct.pack( '6IH2f?2P', self.codec_para.width, self.codec_para.height, self.model_width, self.model_height, model_info.class_num, 500, # nmsTopK model_info.anchor_size, 0.25, # score threashold 0.45, # nms threshold True, # is pad resize apu_data_ptr, boxes_info_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para) ) common.error_check(ret, "lyn_plugin_run_async") # draw_para = struct.pack( # 'P2IiP', # boxes_info_ptr, # self.codec_para.width, # self.codec_para.height, # format, # frame_data_ptr, # ) # ret = sdk.lyn_plugin_run_async( # model_info.plugin_stream, # model_info.plugin, # "lynDrawBoxAndText", # draw_para, # len(draw_para), # ) # common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # model_info.plugin_stream, # show_boxinfo_cb, # [model_info.model_name, model_info.boxes_info], # ) ret = sdk.lyn_stream_add_async_callback( model_info.plugin_stream, free_to_pool_callback, [model_info.apu_output_mem_pool, apu_output_data], ) common.error_check(ret, "lyn_stream_add_async_callback") def merge_plugin_process(self, cb_data): plugin = self.model_infos[-1].plugin plugin_stream = self.model_infos[-1].plugin_stream pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) sources_ptrs = [ pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos ] while len(sources_ptrs) < 4: sources_ptrs.append(0) box_merge_params = struct.pack( "P4PH", # 格式:目标指针 + 4 个源指针 + 源数量 target_ptr, # target 的地址 *sources_ptrs, # sources 的地址列表 len(self.model_infos) # 源数量 ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params) ) common.error_check(ret, "lyn_plugin_run_async") # ret = sdk.lyn_stream_add_callback( # plugin_stream, # show_boxinfo_cb, # ['all', self.total_boxes_info], # ) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None) frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( plugin_stream, plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") def encode_process(self, cb_data: cb_data): # 在 plugin 处理完之后,进行编码操作 ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.enc_head_flag: # 第一帧编码 self.enc_head_flag = False enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.output_path = self.attr.output_path encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_venc_get_paramsset_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_get_paramsset_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_sendframe_async( self.venc_send_stream, self.venc_handle, cb_data.frame ) common.error_check(ret, "lyn_venc_sendframe_async") enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.eos = cb_data.frame.eos enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.output_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_stream_add_async_callback( self.venc_send_stream, free_to_pool_callback, [cb_data.frame_pool, cb_data.frame.data], ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_recvpacket_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_recvpacket_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, encode_data ) common.error_check(ret, "lyn_stream_add_async_callback") def run(self): global cancel_flag # 开启发送线程 self.__send_thread = threading.Thread( target=self.send_thread_func, args=(cancel_flag,) ) self.__send_thread.start() # 开启接收线程 self.__recv_thread = threading.Thread( target=self.recv_thread_func, args=(cancel_flag,) ) self.__recv_thread.start() # 开启图像处理和推理线程 self.__ipe_thread = threading.Thread( target=self.ipe_thread_func, args=(cancel_flag,) ) self.__ipe_thread.start() def close(self): if self.__send_thread.is_alive(): self.__send_thread.join() if self.__recv_thread.is_alive(): self.__recv_thread.join() if self.__ipe_thread.is_alive(): self.__ipe_thread.join() ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.ipe_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.apu_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.plugin_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_destroy_stream(self.send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.ipe_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.apu_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.plugin_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_event(self.ipe_event) common.error_check(ret, "lyn_destroy_event") ret = sdk.lyn_destroy_event(self.apu_event) common.error_check(ret, "lyn_destroy_event") # destory ipe desc and config ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_destroy_config_desc") ret = sdk.lyn_plugin_unregister(self.plugin) common.error_check(ret, "lyn_plugin_unregister") # 卸载模型 for model_info in self.model_infos: ret = sdk.lyn_unload_model(model_info.model) common.error_check(ret, "lyn_unload_model") if self.__vdec_hdl != "": ret = sdk.lyn_vdec_close(self.__vdec_hdl) common.error_check(ret, "lyn_vdec_close") if self.__ctx != "": ret = sdk.lyn_destroy_context(self.__ctx) common.error_check(ret, "lyn_destroy_context") def run_device(input_url: str, output_url:str, device_id: int, model_configs, threads) -> None: attr = device_process_attr() attr.url = input_url attr.output_path = output_url attr.device_id = device_id attr.video_frame = queue.Queue(10) attr.model_configs = model_configs if not attr.url: raise ValueError('input file miss!!!') if not attr.output_path: raise ValueError('unspecified output path!!!') decoder = yolov5(attr) infer_thread = threading.Thread(target=decoder.run, args=()) threads.append(infer_thread) infer_thread.start() out_thread = threading.Thread(target=decoder.push_video, args=()) threads.append(out_thread) out_thread.start() # decoder.close() if __name__ == "__main__": signal.signal(signal.SIGINT, cancel_process) devices = [ { 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', 'output_url': 'rtsp://192.168.1.100:8554/live/101', 'model_configs': [ { 'model_path': 'models/yolov5s/Net_0', 'plugin_path': 'plugins/libYolov5Plugin.so', 'model_name': 'yolov5s', }, { 'model_path': 'models/yuntai/Net_0', 'plugin_path': 'plugins/libMultiPlugin.so', 'model_name': 'yuntai', } ] }, { 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', 'output_url': 'rtsp://192.168.1.100:8554/live/102', 'model_configs': [ { 'model_path': 'models/yolov5s/Net_0', 'plugin_path': 'plugins/libYolov5Plugin.so', 'model_name': 'yolov5s', }, { 'model_path': 'models/yuntai/Net_0', 'plugin_path': 'plugins/libMultiPlugin.so', 'model_name': 'yuntai', } ] }, # { # 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', # 'output_url': 'rtsp://192.168.1.100:8554/live/test3', # 'model_configs': [ # { # 'model_path': 'models/yolov5s/Net_0', # 'plugin_path': 'plugins/libYolov5Plugin.so', # 'model_name': 'yolov5s', # }, # { # 'model_path': 'models/yuntai/Net_0', # 'plugin_path': 'plugins/libMultiPlugin.so', # 'model_name': 'yuntai', # } # ] # }, # { # 'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101', # 'output_url': 'rtsp://192.168.1.100:8554/live/test4', # 'model_configs': [ # { # 'model_path': 'models/yolov5s/Net_0', # 'plugin_path': 'plugins/libYolov5Plugin.so', # 'model_name': 'yolov5s', # }, # { # 'model_path': 'models/yuntai/Net_0', # 'plugin_path': 'plugins/libMultiPlugin.so', # 'model_name': 'yuntai', # } # ] # } ] device_count, ret = sdk.lyn_get_device_count() if ret != 0 or device_count <= 0: print("Error: Unable to fetch device count or no devices available.") exit(1) threads = [] for idx, device in enumerate(devices): # 通过取余计算均匀分配的 device_id,设备 ID 从 0 开始 device_id = idx % device_count run_device(device['input_url'],device['output_url'],device_id, device['model_configs'], threads) # 等待所有线程完成 for thread in threads: thread.join()