Newer
Older
lynxi-casic-demo / scene_handler / base_scene_handler.py
zhangyingjie on 24 Jan 29 KB 增加后台接口调用
import struct
import numpy as np
import pylynchipsdk as sdk
import copy
import sys, os
import ctypes
from ctypes import *
import subprocess
import threading
import multiprocessing
import time

from global_logger import logger
from blockqueue import block_queue
from callback_data_struct import *
import common
import bufferpool
from detect_utils import bbox, Box, SubObject, SubClass, free_to_pool_callback, recv_frame_cb, set_padding_data
from info_query import ModelObject

def save_file_cb(params):
    cb_data: save_file_cb_data = params[0]
    frame_cb_data: cb_data = params[1]

    packet = cb_data.packet
    data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet)
    common.error_check(ret, "lyn_enc_get_remote_packet_valid_size")
    data = np.zeros(data_size, dtype=np.byte)
    data_ptr = sdk.lyn_numpy_to_ptr(data)
    ret = sdk.lyn_memcpy(
        data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "lyn_memcpy")
    
    cb_data.video_frame.put([data, packet.eos])
    cb_data.recv_pool.push(packet.data)
    return ret

class SceneModelHandler:
    def __init__(self, model_path, plugin_path, model_name,model_code, objects, image_width, image_height) -> None:
        self.model_path = model_path
        self.plugin_path = plugin_path
        self.model_name = model_name
        self.model_code = model_code
        self.objects = objects
        logger.info(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}')

        self.model, ret = sdk.lyn_load_model(self.model_path)
        common.error_check(ret, "lyn_load_model")

        self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path)
        common.error_check(ret, "lyn_plugin_register")

        self.apu_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.plugin_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.apu_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.model_desc, ret = sdk.lyn_model_get_desc(self.model)
        common.error_check(ret, "lyn_model_get_desc")
        self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize
        self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2]
        self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1]

        self.apu_output_size = self.model_desc.outputDataLen

        self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5
        self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1]
        self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))

        self.sub_objects_info, ret = sdk.lyn_malloc(ctypes.sizeof(SubObject))

        sub_obj = sdk.c_malloc(ctypes.sizeof(SubObject))

        # 获取底层指针
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        raw_ptr = ctypes.pythonapi.PyCapsule_GetPointer(sub_obj, None)
        if not raw_ptr:
            raise ValueError("Failed to extract pointer from PyCapsule")

        # 转换为 ctypes.POINTER(SubObject)
        sub_obj_ptr = ctypes.cast(raw_ptr, ctypes.POINTER(SubObject))

        # 通过 ctypes.POINTER 操作数据
        sub_obj_instance = sub_obj_ptr.contents
        sub_obj_instance.objectsnum = len(self.objects)

        for i in range(sub_obj_instance.objectsnum):
            obj = self.objects[i]
            sub_obj_instance.objects[i].object_id = int(obj.object_code)
            sub_obj_instance.objects[i].object_name = obj.object_name
            sub_obj_instance.objects[i].conf_threshold = obj.conf_threshold
            sub_obj_instance.objects[i].alarm_threshold = obj.alarm_threshold
            sub_obj_instance.objects[i].model_code = obj.model_code
            if obj.range:
                # 转为(x1 y1 x2 y2 x3 y3 x4 y4)
                abs_range = []
                for boundary_point in obj.range:                    
                    abs_range.append(boundary_point.x * image_width)
                    abs_range.append(boundary_point.y * image_height)
                logger.debug(f'abs_range = {abs_range}')
                sub_obj_instance.objects[i].range = (c_float * 8)(*abs_range)
            else:
                sub_obj_instance.objects[i].range = (0, 0, 0, 0, 0, 0, 0, 0)

        ret = sdk.lyn_memcpy(
            self.sub_objects_info, sub_obj, ctypes.sizeof(SubObject), sdk.lyn_memcpy_dir_t.ClientToServer
        )
        common.error_check(ret, "save_boxinfo_cb lyn_memcpy")

        self.apu_output_mem_pool = bufferpool.buffer_pool(
            self.apu_output_size * self.batch_size, 5
        )
        
        logger.debug(f'batch_size = {self.batch_size}')
        logger.debug(f'model_width = {self.model_width}')
        logger.debug(f'model_height = {self.model_height}')
        logger.debug(f'apu_output_size = {self.apu_output_size}')
        logger.debug(f'class_num = {self.class_num}')
        logger.debug(f'anchor_size = {self.anchor_size}')

class BaseSceneHandler:
    def __init__(self, attr: device_process_attr) -> None:
        self.attr = ""
        self.__ctx = ""
        self.__send_thread = ""
        self.__recv_thread = ""
        self.__ipe_thread = ""
        self.__demux_hdl = ""
        self.codec_para = ""
        self.__vdec_hdl = ""
        self.__vdec_attr = sdk.lyn_vdec_attr_t()
        self.__send_queue = block_queue()
        self.__ipe_queue = block_queue()
        self.__send_num = 0
        self.__recv_num = 0
        # 创建上下文环境
        self.attr = copy.copy(attr)
        self.video_frame = attr.video_frame
        self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id)
        self.enc_head_flag = True
        common.error_check(ret, "create context")
        ret = sdk.lyn_register_error_handler(common.default_stream_error_handle)
        common.error_check(ret, 'lyn_register_error_handler')

        # 打开解封装器
        self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)
        common.error_check(ret, "lyn_demux_open ")
        self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_codec_para ")
        self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_framerate")
        logger.debug(f'fps = {self.fps}')

        # 打开解码器
        self.__vdec_attr.codec_id = self.codec_para.codec_id
        self.__vdec_attr.output_fmt = self.attr.output_fmt
        self.__vdec_attr.scale = self.attr.scale
        self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr)
        common.error_check(ret, "lyn_vdec_open ")
        self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info(
            self.codec_para, self.__vdec_attr
        )
        logger.debug(f'self.vdec_out_info.width = {self.vdec_out_info.width}')
        logger.debug(f'self.vdec_out_info.height = {self.vdec_out_info.height}')
        common.error_check(ret, "lyn_vdec_get_out_info ")
        self.attr.width = self.vdec_out_info.width
        self.attr.height = self.vdec_out_info.height

        # 创建stream、event和ipe desc
        self.send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.ipe_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.venc_recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.venc_send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.ipe_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")
        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc")
        self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc")
        self.ipe_config, ret = sdk.lyn_ipe_create_config_desc()
        common.error_check(ret, "lyn_ipe_create_config_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_reset_config_desc")

        # 获取模型信息
        scene_model_configs = []
        self.model_infos = []
        # self.model_infos = [
        #     SceneModelHandler(
        #         model_plugin= '',
        #         plugin_path='',
        #         model_code='',
        #         model_name='',
        #         objects=[
        #             ModelObject(object_code='',object_name='',alarm_threshold='',conf_threshold=0.5,model_code=''),
        #             ModelObject(object_code='',object_name='',alarm_threshold='',conf_threshold=0.5,model_code='')
        #         ],
        #         image_width=self.attr.width,
        #         image_height=self.attr.height,
        #     )
        # ]
        self.model_nums = len(self.model_infos)


        
        self.batch_size = self.model_infos[0].batch_size if self.model_nums > 0 else 1
        self.model_width = self.model_infos[0].model_width if self.model_nums > 0 else 640
        self.model_height = self.model_infos[0].model_height if self.model_nums > 0 else 640
        # self.apu_output_size = self.model_infos[0].apu_output_size if self.model_nums > 0 else 1

        logger.debug(f'self.batch_size = {self.batch_size}')
        logger.debug(f'self.model_width = {self.model_width}')
        logger.debug(f'self.model_height = {self.model_height}')
        # print(f'self.apu_output_size = {self.apu_output_size}')

        self.ipe_output_size = (
                self.model_width
                * self.model_height
                * (self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3] if self.model_nums > 0 else 3)
        )
        
        (
            self.resize_width,
            self.resize_height,
            self.pad_x,
            self.pad_y,
        ) = set_padding_data(
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.model_width,
            self.model_height,
        )

        
        # 创建对象池
        self.ipe_output_mem_pool = bufferpool.buffer_pool(
            self.ipe_output_size * self.batch_size, 5
        )

        self.venc_recv_pool = bufferpool.buffer_pool(
            self.vdec_out_info.predict_buf_size, 5
        )
        logger.debug(f'self.ipe_output_size = {self.ipe_output_size}')
        logger.debug(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}')

        # 设置编码参数
        self.venc_attr = sdk.lyn_venc_attr_t()
        ret = sdk.lyn_venc_set_default_params(self.venc_attr)
        common.error_check(ret, "lyn_venc_set_default_params")
        self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264
        self.venc_attr.width = self.vdec_out_info.width
        self.venc_attr.height = self.vdec_out_info.height
        self.venc_attr.bit_depth = 8
        self.venc_attr.bframes_num = 0
        self.venc_attr.pframes_num = 5
        self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12
        self.venc_attr.target_bitrate = 6000000
        self.venc_attr.level = -1
        self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr)
        common.error_check(ret, "lyn_vdec_open")

        self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))
        self.start_process_time = None
        self.last_push_time = None
        self.stream_out_process = None
        self.init_output_process()

        
        self.frame_step = 5
        self.frame_idx = self.frame_step - 1 # 为了处理第0帧,这里不从0开始计数,而是从step-1开始

        self.last_alarm_time = {} #key: model_code-objcet_code, value: time

    
    def init_output_process(self):
        command = ['ffmpeg',
                '-i', '-',
                "-c:v", "copy",
                '-f', 'rtsp',
                '-rtsp_transport', 'tcp',
                self.attr.output_path]

        # 启动FFmpeg子进程
        try:
            self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        except OSError as e:
            logger.exception(f"Failed to start process: {e}")
        except subprocess.SubprocessError as e:
            logger.exception(f"Subprocess error: {e}")
        except Exception as e:
            logger.exception(f"An unexpected error occurred: {e}")

        # stream_out_process = (
        # ffmpeg
        # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25)
        # .output(output_path, vcodec='libx264',
        #         preset='ultrafast', tune='zerolatency',
        #         bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10)
        # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report'])
        # )

        def read_stderr(process):
            for line in iter(process.stderr.readline, b''):
                logger.info(f"stderr: {line.decode('utf-8').strip()}")

        if self.stream_out_process:
            stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,))
            stderr_thread.daemon = True
            stderr_thread.start()


    def push_video(self):
        # show window
        global cancel_flag
        
        frame_rate = self.fps  # 目标帧率
        frame_interval = 1 / frame_rate  # 每帧间隔时间

        if not self.last_push_time:
            self.last_push_time = time.time()

        while True:
            try:
                frame = self.video_frame.get(False)
            except:
                time.sleep(0.01)
                continue
            if frame[1]:
                # cv2.destroyAllWindows()
                sys.exit()

            if frame and self.stream_out_process:
                image = frame[0]
                # resized_image = cv2.resize(image, (640, 360))

                self.stream_out_process.stdin.write(image.tobytes())
                # elapsed_time = time.time() - self.start_process_time
                elapsed_time = time.time() - self.last_push_time
                sleep_time = frame_interval - elapsed_time - 0.01
                # print(f'elapsed_time = {elapsed_time*1000:.3f}ms, sleep_time = {sleep_time*1000:.3f}ms')
                time.sleep(max(0, sleep_time))
                self.last_push_time = time.time()

    def run(self):
        # 开启发送线程
        self.__send_thread = threading.Thread(
            target=self.send_thread_func, args=()
        )
        self.__send_thread.start()

        # 开启接收线程
        self.__recv_thread = threading.Thread(
            target=self.recv_thread_func, args=()
        )
        self.__recv_thread.start()

        # 开启图像处理和推理线程
        self.__ipe_thread = threading.Thread(
            target=self.ipe_thread_func, args=()
        )
        self.__ipe_thread.start()

    def close(self):
        if self.__send_thread.is_alive():
            self.__send_thread.join()
        if self.__recv_thread.is_alive():
            self.__recv_thread.join()
        if self.__ipe_thread.is_alive():
            self.__ipe_thread.join()

        ret = sdk.lyn_synchronize_stream(self.send_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.ipe_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.apu_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.plugin_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_synchronize_stream")

        ret = sdk.lyn_destroy_stream(self.send_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.ipe_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.apu_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.plugin_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_destroy_stream")

        ret = sdk.lyn_destroy_event(self.ipe_event)
        common.error_check(ret, "lyn_destroy_event")
        ret = sdk.lyn_destroy_event(self.apu_event)
        common.error_check(ret, "lyn_destroy_event")

        # destory ipe desc and config
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_destroy_config_desc")
        ret = sdk.lyn_plugin_unregister(self.plugin)
        common.error_check(ret, "lyn_plugin_unregister")

        # 卸载模型
        
        for model_info in self.model_infos:
            ret = sdk.lyn_unload_model(model_info.model)
            common.error_check(ret, "lyn_unload_model")

        if self.__vdec_hdl != "":
            ret = sdk.lyn_vdec_close(self.__vdec_hdl)
            common.error_check(ret, "lyn_vdec_close")

        if self.__ctx != "":
            ret = sdk.lyn_destroy_context(self.__ctx)
            common.error_check(ret, "lyn_destroy_context")


    def send_thread_func(self):
        # 设置上下文环境 创建发送stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            # 从解封装器读取一个包
            pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl)
            eos = pkt.eos
            if ret != 0 and not eos:
                sdk.lyn_demux_close(self.__demux_hdl)
                time.sleep(500.0 / 1000)
                logger.warning("demux failed, reconnecting...")
                self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)

                if not ret:
                    self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
                    common.error_check(ret, "lyn_synchronize_stream")
                    logger.debug(f'fps = {self.fps}')
                continue

            # 发送给解码器解码
            ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt)
            common.error_check(ret, "lyn_vdec_send_packet_async")
            ret = sdk.lyn_synchronize_stream(self.send_stream)
            common.error_check(ret, "lyn_synchronize_stream")

            # 释放packet内存并通知接收结果
            if not eos:
                sdk.lyn_demux_free_packet(pkt)
                self.__send_num += 1
                self.__send_queue.put(self.__send_num)
            else:
                self.__send_queue.put(-1)


    def recv_thread_func(self):
        # 设置上下文环境 创建接收stream
        sdk.lyn_set_current_context(self.__ctx)
        frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5)
        while self.__recv_num >= 0:
            self.__recv_num = self.__send_queue.take()

            cb_data = recv_cb_data()
            cb_data.frame.eos = self.__recv_num < 0
            cb_data.frame.data = frame_pool.pop()
            cb_data.frame.size = self.vdec_out_info.predict_buf_size
            cb_data.frame_pool = frame_pool
            cb_data.send_num = self.__send_num
            cb_data.recv_num = self.__recv_num
            cb_data.attr = self.attr
            cb_data.block_queue = self.__ipe_queue
            cb_data.video_frame = self.video_frame
            # 插入接收指令,并添加接收完成回调函数
            ret = sdk.lyn_vdec_recv_frame_async(
                self.recv_stream, self.__vdec_hdl, cb_data.frame
            )
            common.error_check(ret, "lyn_vdec_recv_frame_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.recv_stream, recv_frame_cb, cb_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")


    def ipe_thread_func(self):
        # 设置上下文环境 创建ipe stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            cb_data = self.__ipe_queue.take()
            eos = cb_data.frame.eos
            self.start_process_time = time.time()
            self.model_process(cb_data) # 这里的cb_data是从视频流解析出来的视频帧

    def model_process(self, cb_data):
        self.frame_idx = (self.frame_idx + 1) % self.frame_step
        if self.frame_idx == 0:
            # print(f'{self.frame_idx} process')
            # 正常处理
            if self.model_nums > 0:
                ipe_out_data = self.ipe_process(cb_data)
                for model_info in self.model_infos:
                    apu_output_data = self.apu_process(model_info, ipe_out_data)
                    self.plugin_process(model_info, apu_output_data, cb_data)

            # ret = sdk.lyn_stream_add_async_callback(
            #     self.model_infos[-1].plugin_stream,
            #     show_video_cb,
            #     [cb_data, self.model_infos,self.last_alarm_time],
            # )
            # common.error_check(ret, "lyn_stream_add_async_callback")

            # self.merge_plugin_process(cb_data)
            self.encode_process(cb_data)

            common.print_frame_rate(self.get_channel_info())

            if self.model_nums > 0:
                ret = sdk.lyn_stream_add_async_callback(
                    self.model_infos[-1].apu_stream,
                    free_to_pool_callback,
                    [self.ipe_output_mem_pool, ipe_out_data],
                )
                common.error_check(ret, "lyn_stream_add_async_callback")
        else:
            # print(f'{self.frame_idx} draw')
            # 跳过ipe apu处理,直接用上次的box渲染 
            for model_info in self.model_infos:
                self.plugin_draw_process(model_info, cb_data)
            self.encode_process(cb_data)
            common.print_frame_rate(self.get_channel_info())

    def plugin_process(self,model_info, apu_output_data, cb_data):
        pass

    def plugin_draw_process(self,model_info, cb_data):
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

    def encode_process(self, cb_data: cb_data):
        if self.model_nums > 0:
            # 在 plugin 处理完之后,进行编码操作
            ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event)
            common.error_check(ret, "lyn_record_event")
            ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event)
            common.error_check(ret, "lyn_stream_wait_event")

        if self.enc_head_flag:  # 第一帧编码
            self.enc_head_flag = False
            enc_packet = sdk.lyn_packet_t()
            enc_packet.size = self.vdec_out_info.predict_buf_size
            enc_packet.data = self.venc_recv_pool.pop()
            encode_data = save_file_cb_data()
            encode_data.output_path = self.attr.output_path
            encode_data.packet = enc_packet
            encode_data.recv_pool = self.venc_recv_pool
            encode_data.file_path = self.attr.output_path
            encode_data.video_frame = self.attr.video_frame
            
            ret = sdk.lyn_venc_get_paramsset_async(
                self.venc_recv_stream, self.venc_handle, enc_packet
            )
            common.error_check(ret, "lyn_venc_get_paramsset_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_venc_sendframe_async(
            self.venc_send_stream, self.venc_handle, cb_data.frame
        )
        common.error_check(ret, "lyn_venc_sendframe_async")

        enc_packet = sdk.lyn_packet_t()
        enc_packet.size = self.vdec_out_info.predict_buf_size
        enc_packet.eos = cb_data.frame.eos
        enc_packet.data = self.venc_recv_pool.pop()
        encode_data = save_file_cb_data()
        encode_data.packet = enc_packet
        encode_data.recv_pool = self.venc_recv_pool
        encode_data.file_path = self.attr.output_path
        encode_data.output_path = self.attr.output_path
        encode_data.video_frame = self.attr.video_frame
        # ret = sdk.lyn_stream_add_async_callback(
        #     self.venc_send_stream,
        #     free_to_pool_callback,
        #     [cb_data.frame_pool, cb_data.frame.data],
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")
        ret = sdk.lyn_venc_recvpacket_async(
            self.venc_recv_stream, self.venc_handle, enc_packet
        )
        common.error_check(ret, "lyn_venc_recvpacket_async")
        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream,
            free_to_pool_callback,
            [cb_data.frame_pool, cb_data.frame.data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")


    def ipe_process(self, cb_data: framepool_cb_data):
        sdk.lyn_set_current_context(self.__ctx)
        ipe_out_data = self.ipe_output_mem_pool.pop()
        # 设置ipe输入
        ret = sdk.lyn_ipe_set_input_pic_desc(
            self.ipe_input_desc,
            cb_data.frame.data,
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.__vdec_attr.output_fmt,
        )
        common.error_check(ret, "lyn_ipe_set_input_pic_desc")
        ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data)
        common.error_check(ret, "lyn_ipe_set_output_pic_data")
        ret = sdk.lyn_ipe_set_resize_config(
            self.ipe_config, self.resize_width, self.resize_height
        )
        common.error_check(ret, "lyn_ipe_set_resize_config")
        ret = sdk.lyn_ipe_set_pad_config(
            self.ipe_config,
            self.pad_y,
            self.pad_x,
            self.pad_y,
            self.pad_x,
            114,
            114,
            114,
        )
        common.error_check(ret, "lyn_ipe_set_pad_config")
        ret = sdk.lyn_ipe_set_c2c_config(
            self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0
        )
        common.error_check(ret, "lyn_ipe_set_c2c_config")
        ret = sdk.lyn_ipe_cal_output_pic_desc(
            self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0
        )
        common.error_check(ret, "lyn_ipe_cal_output_pic_desc")
        ret = sdk.lyn_ipe_process_async(
            self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config
        )
        common.error_check(ret, "lyn_ipe_process_async")
        return ipe_out_data
    

    def apu_process(self, model_info, ipe_out_data):
        # 等待IPE处理完成
        ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event)
        common.error_check(ret, "lyn_stream_wait_event")
        apu_output_data = model_info.apu_output_mem_pool.pop()
        ret = sdk.lyn_execute_model_async(
            model_info.apu_stream,
            model_info.model,
            ipe_out_data,
            apu_output_data,
            self.batch_size,
        )
        common.error_check(ret, "lyn_execute_model_async")
        return apu_output_data
    
    def get_channel_info(self) -> str:
        return f'{self.attr.device_id}_{self.attr.output_path}'