import struct import numpy as np import pylynchipsdk as sdk import copy import sys, os import ctypes from ctypes import * import subprocess import threading import multiprocessing import time from global_logger import logger from blockqueue import block_queue from callback_data_struct import * import common import bufferpool from detect_utils import bbox, Box, SubObject, SubClass, free_to_pool_callback, recv_frame_cb, set_padding_data from info_query import ModelObject def save_file_cb(params): cb_data: save_file_cb_data = params[0] frame_cb_data: cb_data = params[1] packet = cb_data.packet data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet) common.error_check(ret, "lyn_enc_get_remote_packet_valid_size") data = np.zeros(data_size, dtype=np.byte) data_ptr = sdk.lyn_numpy_to_ptr(data) ret = sdk.lyn_memcpy( data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient ) common.error_check(ret, "lyn_memcpy") cb_data.video_frame.put([data, packet.eos]) cb_data.recv_pool.push(packet.data) return ret class SceneModelHandler: def __init__(self, model_path, plugin_path, model_name,model_code, objects, image_width, image_height) -> None: self.model_path = model_path self.plugin_path = plugin_path self.model_name = model_name self.model_code = model_code self.objects = objects logger.info(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}') self.model, ret = sdk.lyn_load_model(self.model_path) common.error_check(ret, "lyn_load_model") self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path) common.error_check(ret, "lyn_plugin_register") self.apu_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.plugin_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.apu_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.model_desc, ret = sdk.lyn_model_get_desc(self.model) common.error_check(ret, "lyn_model_get_desc") self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2] self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1] self.apu_output_size = self.model_desc.outputDataLen self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5 self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1] self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.sub_objects_info, ret = sdk.lyn_malloc(ctypes.sizeof(SubObject)) sub_obj = sdk.c_malloc(ctypes.sizeof(SubObject)) # 获取底层指针 pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] pythonapi.PyCapsule_GetPointer.restype = c_void_p raw_ptr = ctypes.pythonapi.PyCapsule_GetPointer(sub_obj, None) if not raw_ptr: raise ValueError("Failed to extract pointer from PyCapsule") # 转换为 ctypes.POINTER(SubObject) sub_obj_ptr = ctypes.cast(raw_ptr, ctypes.POINTER(SubObject)) # 通过 ctypes.POINTER 操作数据 sub_obj_instance = sub_obj_ptr.contents sub_obj_instance.objectsnum = len(self.objects) for i in range(sub_obj_instance.objectsnum): obj = self.objects[i] sub_obj_instance.objects[i].object_id = int(obj.object_code) sub_obj_instance.objects[i].object_name = obj.object_name sub_obj_instance.objects[i].conf_threshold = obj.conf_threshold sub_obj_instance.objects[i].alarm_threshold = obj.alarm_threshold sub_obj_instance.objects[i].model_code = obj.model_code if obj.range: # 转为(x1 y1 x2 y2 x3 y3 x4 y4) abs_range = [] for boundary_point in obj.range: abs_range.append(boundary_point.x * image_width) abs_range.append(boundary_point.y * image_height) logger.debug(f'abs_range = {abs_range}') sub_obj_instance.objects[i].range = (c_float * 8)(*abs_range) else: sub_obj_instance.objects[i].range = (0, 0, 0, 0, 0, 0, 0, 0) ret = sdk.lyn_memcpy( self.sub_objects_info, sub_obj, ctypes.sizeof(SubObject), sdk.lyn_memcpy_dir_t.ClientToServer ) common.error_check(ret, "save_boxinfo_cb lyn_memcpy") self.apu_output_mem_pool = bufferpool.buffer_pool( self.apu_output_size * self.batch_size, 5 ) logger.debug(f'batch_size = {self.batch_size}') logger.debug(f'model_width = {self.model_width}') logger.debug(f'model_height = {self.model_height}') logger.debug(f'apu_output_size = {self.apu_output_size}') logger.debug(f'class_num = {self.class_num}') logger.debug(f'anchor_size = {self.anchor_size}') class BaseSceneHandler: def __init__(self, attr: device_process_attr) -> None: self.attr = "" self.__ctx = "" self.__send_thread = "" self.__recv_thread = "" self.__ipe_thread = "" self.__demux_hdl = "" self.codec_para = "" self.__vdec_hdl = "" self.__vdec_attr = sdk.lyn_vdec_attr_t() self.__send_queue = block_queue() self.__ipe_queue = block_queue() self.__send_num = 0 self.__recv_num = 0 # 创建上下文环境 self.attr = copy.copy(attr) self.video_frame = attr.video_frame self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id) self.enc_head_flag = True common.error_check(ret, "create context") ret = sdk.lyn_register_error_handler(common.default_stream_error_handle) common.error_check(ret, 'lyn_register_error_handler') # 打开解封装器 self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) common.error_check(ret, "lyn_demux_open ") self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_codec_para ") self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl) common.error_check(ret, "lyn_demux_get_framerate") logger.debug(f'fps = {self.fps}') # 打开解码器 self.__vdec_attr.codec_id = self.codec_para.codec_id self.__vdec_attr.output_fmt = self.attr.output_fmt self.__vdec_attr.scale = self.attr.scale self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr) common.error_check(ret, "lyn_vdec_open ") self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info( self.codec_para, self.__vdec_attr ) logger.debug(f'self.vdec_out_info.width = {self.vdec_out_info.width}') logger.debug(f'self.vdec_out_info.height = {self.vdec_out_info.height}') common.error_check(ret, "lyn_vdec_get_out_info ") self.attr.width = self.vdec_out_info.width self.attr.height = self.vdec_out_info.height # 创建stream、event和ipe desc self.send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_recv_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.venc_send_stream, ret = sdk.lyn_create_stream() common.error_check(ret, "lyn_create_stream") self.ipe_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.plugin_event, ret = sdk.lyn_create_event() common.error_check(ret, "lyn_create_event") self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc") self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc() common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc") self.ipe_config, ret = sdk.lyn_ipe_create_config_desc() common.error_check(ret, "lyn_ipe_create_config_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_reset_pic_desc") ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_reset_config_desc") # 获取模型信息 scene_model_configs = [] self.model_infos = [] # self.model_infos = [ # SceneModelHandler( # model_plugin= '', # plugin_path='', # model_code='', # model_name='', # objects=[ # ModelObject(object_code='',object_name='',alarm_threshold='',conf_threshold=0.5,model_code=''), # ModelObject(object_code='',object_name='',alarm_threshold='',conf_threshold=0.5,model_code='') # ], # image_width=self.attr.width, # image_height=self.attr.height, # ) # ] self.model_nums = len(self.model_infos) self.batch_size = self.model_infos[0].batch_size if self.model_nums > 0 else 1 self.model_width = self.model_infos[0].model_width if self.model_nums > 0 else 640 self.model_height = self.model_infos[0].model_height if self.model_nums > 0 else 640 # self.apu_output_size = self.model_infos[0].apu_output_size if self.model_nums > 0 else 1 logger.debug(f'self.batch_size = {self.batch_size}') logger.debug(f'self.model_width = {self.model_width}') logger.debug(f'self.model_height = {self.model_height}') # print(f'self.apu_output_size = {self.apu_output_size}') self.ipe_output_size = ( self.model_width * self.model_height * (self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3] if self.model_nums > 0 else 3) ) ( self.resize_width, self.resize_height, self.pad_x, self.pad_y, ) = set_padding_data( self.vdec_out_info.width, self.vdec_out_info.height, self.model_width, self.model_height, ) # 创建对象池 self.ipe_output_mem_pool = bufferpool.buffer_pool( self.ipe_output_size * self.batch_size, 5 ) self.venc_recv_pool = bufferpool.buffer_pool( self.vdec_out_info.predict_buf_size, 5 ) logger.debug(f'self.ipe_output_size = {self.ipe_output_size}') logger.debug(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}') # 设置编码参数 self.venc_attr = sdk.lyn_venc_attr_t() ret = sdk.lyn_venc_set_default_params(self.venc_attr) common.error_check(ret, "lyn_venc_set_default_params") self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264 self.venc_attr.width = self.vdec_out_info.width self.venc_attr.height = self.vdec_out_info.height self.venc_attr.bit_depth = 8 self.venc_attr.bframes_num = 0 self.venc_attr.pframes_num = 5 self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12 self.venc_attr.target_bitrate = 6000000 self.venc_attr.level = -1 self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr) common.error_check(ret, "lyn_vdec_open") self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box)) self.start_process_time = None self.last_push_time = None self.stream_out_process = None self.init_output_process() self.frame_step = 5 self.frame_idx = self.frame_step - 1 # 为了处理第0帧,这里不从0开始计数,而是从step-1开始 self.last_alarm_time = {} #key: model_code-objcet_code, value: time def init_output_process(self): command = ['ffmpeg', '-i', '-', "-c:v", "copy", '-f', 'rtsp', '-rtsp_transport', 'tcp', self.attr.output_path] # 启动FFmpeg子进程 try: self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE) except OSError as e: logger.exception(f"Failed to start process: {e}") except subprocess.SubprocessError as e: logger.exception(f"Subprocess error: {e}") except Exception as e: logger.exception(f"An unexpected error occurred: {e}") # stream_out_process = ( # ffmpeg # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25) # .output(output_path, vcodec='libx264', # preset='ultrafast', tune='zerolatency', # bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10) # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report']) # ) def read_stderr(process): for line in iter(process.stderr.readline, b''): logger.info(f"stderr: {line.decode('utf-8').strip()}") if self.stream_out_process: stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,)) stderr_thread.daemon = True stderr_thread.start() def push_video(self): # show window global cancel_flag frame_rate = self.fps # 目标帧率 frame_interval = 1 / frame_rate # 每帧间隔时间 if not self.last_push_time: self.last_push_time = time.time() while True: try: frame = self.video_frame.get(False) except: time.sleep(0.01) continue if frame[1]: # cv2.destroyAllWindows() sys.exit() if frame and self.stream_out_process: image = frame[0] # resized_image = cv2.resize(image, (640, 360)) self.stream_out_process.stdin.write(image.tobytes()) # elapsed_time = time.time() - self.start_process_time elapsed_time = time.time() - self.last_push_time sleep_time = frame_interval - elapsed_time - 0.01 # print(f'elapsed_time = {elapsed_time*1000:.3f}ms, sleep_time = {sleep_time*1000:.3f}ms') time.sleep(max(0, sleep_time)) self.last_push_time = time.time() def run(self): # 开启发送线程 self.__send_thread = threading.Thread( target=self.send_thread_func, args=() ) self.__send_thread.start() # 开启接收线程 self.__recv_thread = threading.Thread( target=self.recv_thread_func, args=() ) self.__recv_thread.start() # 开启图像处理和推理线程 self.__ipe_thread = threading.Thread( target=self.ipe_thread_func, args=() ) self.__ipe_thread.start() def close(self): if self.__send_thread.is_alive(): self.__send_thread.join() if self.__recv_thread.is_alive(): self.__recv_thread.join() if self.__ipe_thread.is_alive(): self.__ipe_thread.join() ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.ipe_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.apu_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.plugin_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_recv_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_synchronize_stream(self.venc_send_stream) common.error_check(ret, "lyn_synchronize_stream") ret = sdk.lyn_destroy_stream(self.send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.ipe_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.apu_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.plugin_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_recv_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_stream(self.venc_send_stream) common.error_check(ret, "lyn_destroy_stream") ret = sdk.lyn_destroy_event(self.ipe_event) common.error_check(ret, "lyn_destroy_event") ret = sdk.lyn_destroy_event(self.apu_event) common.error_check(ret, "lyn_destroy_event") # destory ipe desc and config ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc) common.error_check(ret, "lyn_ipe_destroy_pic_desc") ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config) common.error_check(ret, "lyn_ipe_destroy_config_desc") ret = sdk.lyn_plugin_unregister(self.plugin) common.error_check(ret, "lyn_plugin_unregister") # 卸载模型 for model_info in self.model_infos: ret = sdk.lyn_unload_model(model_info.model) common.error_check(ret, "lyn_unload_model") if self.__vdec_hdl != "": ret = sdk.lyn_vdec_close(self.__vdec_hdl) common.error_check(ret, "lyn_vdec_close") if self.__ctx != "": ret = sdk.lyn_destroy_context(self.__ctx) common.error_check(ret, "lyn_destroy_context") def send_thread_func(self): # 设置上下文环境 创建发送stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: # 从解封装器读取一个包 pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl) eos = pkt.eos if ret != 0 and not eos: sdk.lyn_demux_close(self.__demux_hdl) time.sleep(500.0 / 1000) logger.warning("demux failed, reconnecting...") self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url) if not ret: self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl) common.error_check(ret, "lyn_synchronize_stream") logger.debug(f'fps = {self.fps}') continue # 发送给解码器解码 ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt) common.error_check(ret, "lyn_vdec_send_packet_async") ret = sdk.lyn_synchronize_stream(self.send_stream) common.error_check(ret, "lyn_synchronize_stream") # 释放packet内存并通知接收结果 if not eos: sdk.lyn_demux_free_packet(pkt) self.__send_num += 1 self.__send_queue.put(self.__send_num) else: self.__send_queue.put(-1) def recv_thread_func(self): # 设置上下文环境 创建接收stream sdk.lyn_set_current_context(self.__ctx) frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5) while self.__recv_num >= 0: self.__recv_num = self.__send_queue.take() cb_data = recv_cb_data() cb_data.frame.eos = self.__recv_num < 0 cb_data.frame.data = frame_pool.pop() cb_data.frame.size = self.vdec_out_info.predict_buf_size cb_data.frame_pool = frame_pool cb_data.send_num = self.__send_num cb_data.recv_num = self.__recv_num cb_data.attr = self.attr cb_data.block_queue = self.__ipe_queue cb_data.video_frame = self.video_frame # 插入接收指令,并添加接收完成回调函数 ret = sdk.lyn_vdec_recv_frame_async( self.recv_stream, self.__vdec_hdl, cb_data.frame ) common.error_check(ret, "lyn_vdec_recv_frame_async") ret = sdk.lyn_stream_add_async_callback( self.recv_stream, recv_frame_cb, cb_data ) common.error_check(ret, "lyn_stream_add_async_callback") def ipe_thread_func(self): # 设置上下文环境 创建ipe stream sdk.lyn_set_current_context(self.__ctx) eos = False while not eos: cb_data = self.__ipe_queue.take() eos = cb_data.frame.eos self.start_process_time = time.time() self.model_process(cb_data) # 这里的cb_data是从视频流解析出来的视频帧 def model_process(self, cb_data): self.frame_idx = (self.frame_idx + 1) % self.frame_step if self.frame_idx == 0: # print(f'{self.frame_idx} process') # 正常处理 if self.model_nums > 0: ipe_out_data = self.ipe_process(cb_data) for model_info in self.model_infos: apu_output_data = self.apu_process(model_info, ipe_out_data) self.plugin_process(model_info, apu_output_data, cb_data) # ret = sdk.lyn_stream_add_async_callback( # self.model_infos[-1].plugin_stream, # show_video_cb, # [cb_data, self.model_infos,self.last_alarm_time], # ) # common.error_check(ret, "lyn_stream_add_async_callback") # self.merge_plugin_process(cb_data) self.encode_process(cb_data) common.print_frame_rate(self.get_channel_info()) if self.model_nums > 0: ret = sdk.lyn_stream_add_async_callback( self.model_infos[-1].apu_stream, free_to_pool_callback, [self.ipe_output_mem_pool, ipe_out_data], ) common.error_check(ret, "lyn_stream_add_async_callback") else: # print(f'{self.frame_idx} draw') # 跳过ipe apu处理,直接用上次的box渲染 for model_info in self.model_infos: self.plugin_draw_process(model_info, cb_data) self.encode_process(cb_data) common.print_frame_rate(self.get_channel_info()) def plugin_process(self,model_info, apu_output_data, cb_data): pass def plugin_draw_process(self,model_info, cb_data): format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12) pythonapi.PyCapsule_GetPointer.restype = c_void_p pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p] frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None) boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) draw_para = struct.pack( 'P2IiP', boxes_info_ptr, self.codec_para.width, self.codec_para.height, format, frame_data_ptr, ) ret = sdk.lyn_plugin_run_async( model_info.plugin_stream, model_info.plugin, "lynDrawBoxAndText", draw_para, len(draw_para), ) common.error_check(ret, "lyn_plugin_run_async") def encode_process(self, cb_data: cb_data): if self.model_nums > 0: # 在 plugin 处理完之后,进行编码操作 ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event) common.error_check(ret, "lyn_stream_wait_event") if self.enc_head_flag: # 第一帧编码 self.enc_head_flag = False enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.output_path = self.attr.output_path encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame ret = sdk.lyn_venc_get_paramsset_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_get_paramsset_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, [encode_data, cb_data] ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_sendframe_async( self.venc_send_stream, self.venc_handle, cb_data.frame ) common.error_check(ret, "lyn_venc_sendframe_async") enc_packet = sdk.lyn_packet_t() enc_packet.size = self.vdec_out_info.predict_buf_size enc_packet.eos = cb_data.frame.eos enc_packet.data = self.venc_recv_pool.pop() encode_data = save_file_cb_data() encode_data.packet = enc_packet encode_data.recv_pool = self.venc_recv_pool encode_data.file_path = self.attr.output_path encode_data.output_path = self.attr.output_path encode_data.video_frame = self.attr.video_frame # ret = sdk.lyn_stream_add_async_callback( # self.venc_send_stream, # free_to_pool_callback, # [cb_data.frame_pool, cb_data.frame.data], # ) # common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_venc_recvpacket_async( self.venc_recv_stream, self.venc_handle, enc_packet ) common.error_check(ret, "lyn_venc_recvpacket_async") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, save_file_cb, [encode_data, cb_data] ) common.error_check(ret, "lyn_stream_add_async_callback") ret = sdk.lyn_stream_add_async_callback( self.venc_recv_stream, free_to_pool_callback, [cb_data.frame_pool, cb_data.frame.data], ) common.error_check(ret, "lyn_stream_add_async_callback") def ipe_process(self, cb_data: framepool_cb_data): sdk.lyn_set_current_context(self.__ctx) ipe_out_data = self.ipe_output_mem_pool.pop() # 设置ipe输入 ret = sdk.lyn_ipe_set_input_pic_desc( self.ipe_input_desc, cb_data.frame.data, self.vdec_out_info.width, self.vdec_out_info.height, self.__vdec_attr.output_fmt, ) common.error_check(ret, "lyn_ipe_set_input_pic_desc") ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data) common.error_check(ret, "lyn_ipe_set_output_pic_data") ret = sdk.lyn_ipe_set_resize_config( self.ipe_config, self.resize_width, self.resize_height ) common.error_check(ret, "lyn_ipe_set_resize_config") ret = sdk.lyn_ipe_set_pad_config( self.ipe_config, self.pad_y, self.pad_x, self.pad_y, self.pad_x, 114, 114, 114, ) common.error_check(ret, "lyn_ipe_set_pad_config") ret = sdk.lyn_ipe_set_c2c_config( self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0 ) common.error_check(ret, "lyn_ipe_set_c2c_config") ret = sdk.lyn_ipe_cal_output_pic_desc( self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0 ) common.error_check(ret, "lyn_ipe_cal_output_pic_desc") ret = sdk.lyn_ipe_process_async( self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config ) common.error_check(ret, "lyn_ipe_process_async") return ipe_out_data def apu_process(self, model_info, ipe_out_data): # 等待IPE处理完成 ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event) common.error_check(ret, "lyn_record_event") ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event) common.error_check(ret, "lyn_stream_wait_event") apu_output_data = model_info.apu_output_mem_pool.pop() ret = sdk.lyn_execute_model_async( model_info.apu_stream, model_info.model, ipe_out_data, apu_output_data, self.batch_size, ) common.error_check(ret, "lyn_execute_model_async") return apu_output_data def get_channel_info(self) -> str: return f'{self.attr.device_id}_{self.attr.output_path}'