Newer
Older
lynxi-casic-demo / multi_model_demo copy.py
from blockqueue import block_queue
from callback_data_struct import *
import common
import bufferpool

import math
import queue
import sys, os
from types import SimpleNamespace
from typing import List, Tuple
import time
import numpy as np
import pylynchipsdk as sdk
import copy
import threading
import multiprocessing
import signal
import sys
import json
import ctypes
from ctypes import *
import struct

import subprocess
import argparse

# import ffmpeg
try:
    import cv2
except ImportError:
    cv2 = None


model_config = [
    {
        'model_path': 'models/yolov5s/Net_0',
        'plugin_path': 'plugins/libYolov5Plugin.so',
        'model_name': 'yolov5s',
    },
    {
        'model_path': 'models/yuntai/Net_0',
        'plugin_path': 'plugins/libMultiPlugin.so',
        'model_name': 'yuntai',
    }
]
stream_out_process = None


def cancel_process(signum, frame):
    global cancel_flag
    cancel_flag.value = True


cancel_flag = multiprocessing.Value('b', False)


class bbox(ctypes.Structure):
    _fields_ = [
        ("xmin", ctypes.c_uint32),
        ("ymin", ctypes.c_uint32),
        ("xmax", ctypes.c_uint32),
        ("ymax", ctypes.c_uint32),
        ("score", ctypes.c_float),
        ("label", ctypes.c_wchar * 64),
    ]


class Box(ctypes.Structure):
    _fields_ = [("boxesnum", ctypes.c_uint32), ("boxes", ctypes.ARRAY(bbox, 256))]

class ModelInfo:
    def __init__(self, model_path, plugin_path, model_name = '') -> None:
        self.model_path = model_path
        self.plugin_path = plugin_path
        self.model_name = model_name
        print(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}')

        self.model, ret = sdk.lyn_load_model(self.model_path)
        common.error_check(ret, "lyn_load_model")

        self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path)
        common.error_check(ret, "lyn_plugin_register")

        self.apu_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.plugin_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.apu_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.model_desc, ret = sdk.lyn_model_get_desc(self.model)
        common.error_check(ret, "lyn_model_get_desc")
        self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize
        self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2]
        self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1]

        self.apu_output_size = self.model_desc.outputDataLen

        self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5
        self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1]
        self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))

        self.apu_output_mem_pool = bufferpool.buffer_pool(
            self.apu_output_size * self.batch_size, 5
        )
        
        print(f'batch_size = {self.batch_size}')
        print(f'model_width = {self.model_width}')
        print(f'model_height = {self.model_height}')
        print(f'apu_output_size = {self.apu_output_size}')
        print(f'class_num = {self.class_num}')
        print(f'anchor_size = {self.anchor_size}')



def recv_frame_cb(cb_data: cb_data):
    # 放到ipe的处理队列中
    cb_data.block_queue.put(cb_data)
    return 0


def free_to_pool_callback(params):
    vdec_out_pool = params[0]
    data = params[1]
    vdec_out_pool.push(data)
    return 0


def set_even(num):
    num = math.floor(num)
    if num % 2 != 0:
        res = num - 1
    else:
        res = num
    return res


def set_padding_data(vidoe_width, video_height, model_width, model_height):
    if vidoe_width > video_height:
        resize_width = model_width
        pad_x = 0
        resize_height = set_even(int(video_height * model_width / vidoe_width))
        pad_y = (model_height - resize_height) / 2
    else:
        resize_height = model_height
        pad_y = 0
        resize_width = set_even(int(vidoe_width * model_width / video_height))
        pad_x = (model_width - resize_width) / 2

    return int(resize_width), int(resize_height), int(pad_x), int(pad_y)


def show_video_cb(cb_data: cb_data):
    data = np.empty(cb_data.frame.size, dtype=np.uint8)
    data_ptr = sdk.lyn_numpy_to_ptr(data)
    ret = sdk.lyn_memcpy(
        data_ptr,
        cb_data.frame.data,
        cb_data.frame.size,
        sdk.lyn_memcpy_dir_t.ServerToClient,
    )
    common.error_check(ret, "lyn_memcpy")
    yuvImg = np.reshape(
        data, (cb_data.attr.height * 3 // 2, cb_data.attr.width)
    ).astype(np.uint8)
    import cv2

    rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12)
    if cb_data.frame.eos:
        cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos])
    else:
        cb_data.attr.video_frame.queue.clear()
        cb_data.attr.video_frame.put([rgbImg, cb_data.frame.eos])
    cb_data.frame_pool.push(cb_data.frame.data)
    return 0


def save_file_cb(cb_data: save_file_cb_data):
    packet = cb_data.packet
    data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet)
    common.error_check(ret, "lyn_enc_get_remote_packet_valid_size")
    data = np.zeros(data_size, dtype=np.byte)
    data_ptr = sdk.lyn_numpy_to_ptr(data)
    ret = sdk.lyn_memcpy(
        data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "lyn_memcpy")
    
    cb_data.video_frame.put([data, packet.eos])
    cb_data.recv_pool.push(packet.data)

    # with open(cb_data.output_path, "ab") as f:
    #     f.write(data.tobytes())
    return ret



def show_boxinfo_cb(params):
    model_name = params[0]
    boxes_info = params[1]
    dst_img_size = ctypes.sizeof(Box)
    host_buf_arr = np.ones(dst_img_size, dtype=np.uint8)
    host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr)
    ret = sdk.lyn_memcpy(
        host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "save_boxinfo_cb lyn_memcpy")
    host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None)
    c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents
    dump_box_json(model_name, c_box)
    return 0


def dump_box_json(model_name, c_box):
    box_dict = {"boxesnum": c_box.boxesnum, "boxes": []}
    for i in range(c_box.boxesnum):
        bbox_instance = c_box.boxes[i]
        bbox_dict = {
            "xmin": bbox_instance.xmin,
            "ymin": bbox_instance.ymin,
            "xmax": bbox_instance.xmax,
            "ymax": bbox_instance.ymax,
            "score": bbox_instance.score,
            "label": bbox_instance.label,
        }
        box_dict["boxes"].append(bbox_dict)
    
    json_str = json.dumps(box_dict, indent=2, ensure_ascii=False)
    print(model_name,box_dict['boxesnum'])
    # print()

    # Serialize the dictionary to JSON
    # with open(output_path, 'a', encoding="utf-8") as f:
    #     json.dump(box_dict, f, indent=2, ensure_ascii=False)
    #     f.write('\n')


def init_output_process(output_path):
    global stream_out_process

    global stream_out_process
    # command = ['ffmpeg',
    #            '-re',
    #            # 'rtsp_transport', 'tcp',
    #            '-f', 'rawvideo',
    #            '-vcodec', 'rawvideo',
    #            '-pix_fmt', 'bgr24',
    #            '-s', "{}x{}".format(640, 360),
    #            '-r', str(25),
    #            '-i', '-',
    #            '-c:v', 'libx264',
    #            # '-pix_fmt', 'yuv420p',
    #            '-preset', 'ultrafast',
    #            '-tune', 'zerolatency',
    #            '-b:v', '2000k',  # 控制码率
    #            '-maxrate', '2000k',
    #            '-bufsize', '4000k',  # 设置缓冲区
    #            '-f', 'rtsp',
    #            '-g', '10',
    #            output_path]
    command = ['ffmpeg',
            '-re',
            "-f", "h264",
            '-i', '-',
            "-c:v", "copy",
            '-r', str(25),
            '-preset', 'ultrafast',
            '-tune', 'zerolatency',
            '-b:v', '2000k',  # 控制码率
            '-maxrate', '2000k',
            '-bufsize', '4000k',  # 设置缓冲区
            '-f', 'rtsp',
            '-rtsp_transport', 'tcp',
            '-g', '10',
            output_path]

    # 启动FFmpeg子进程
    try:
        stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
    except OSError as e:
        print(f"Failed to start process: {e}")
    except subprocess.SubprocessError as e:
        print(f"Subprocess error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

    # stream_out_process = (
    # ffmpeg
    # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25)
    # .output(output_path, vcodec='libx264',
    #         preset='ultrafast', tune='zerolatency',
    #         bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10)
    # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report'])
    # )

    def read_stderr(process):
        for line in iter(process.stderr.readline, b''):
            print(f"stderr: {line.decode('utf-8')}", end='')

    if stream_out_process:
        stderr_thread = threading.Thread(target=read_stderr, args=(stream_out_process,))
        stderr_thread.daemon = True
        stderr_thread.start()


def push_video(video_frame, cancel_flag):
    # show window
    global stream_out_process
    while True:
        try:
            frame = video_frame.get(False)
        except:
            if cancel_flag.value:
                sys.exit()
            else:
                continue
        if frame[1] or cancel_flag.value:
            # cv2.destroyAllWindows()
            sys.exit()

        if frame:
            image = frame[0]
            # resized_image = cv2.resize(image, (640, 360))
            stream_out_process.stdin.write(image.tobytes())


class yolov5:
    def __init__(self, attr: infer_process_attr) -> None:
        self.attr = ""
        self.__ctx = ""
        self.__send_thread = ""
        self.__recv_thread = ""
        self.__ipe_thread = ""
        self.__demux_hdl = ""
        self.codec_para = ""
        self.__vdec_hdl = ""
        self.__vdec_attr = sdk.lyn_vdec_attr_t()
        self.__send_queue = block_queue()
        self.__ipe_queue = block_queue()
        self.__send_num = 0
        self.__recv_num = 0
        # 创建上下文环境
        self.attr = copy.copy(attr)
        self.video_frame = attr.video_frame
        self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id)
        self.enc_head_flag = True
        common.error_check(ret, "create context")
        ret = sdk.lyn_register_error_handler(common.default_stream_error_handle)
        common.error_check(ret, 'lyn_register_error_handler')



        # 打开解封装器
        self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)
        common.error_check(ret, "lyn_demux_open ")
        self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_codec_para ")

        # 打开解码器
        self.__vdec_attr.codec_id = self.codec_para.codec_id
        self.__vdec_attr.output_fmt = self.attr.output_fmt
        self.__vdec_attr.scale = self.attr.scale
        self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr)
        common.error_check(ret, "lyn_vdec_open ")
        self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info(
            self.codec_para, self.__vdec_attr
        )
        print(f'self.vdec_out_info.width = {self.vdec_out_info.width}')
        print(f'self.vdec_out_info.height = {self.vdec_out_info.height}')
        common.error_check(ret, "lyn_vdec_get_out_info ")

        
        # 创建stream、event和ipe desc
        self.send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.ipe_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        # self.apu_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        # self.plugin_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        self.venc_recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.venc_send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.ipe_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")
        # self.apu_event, ret = sdk.lyn_create_event()
        # common.error_check(ret, "lyn_create_event")
        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc")
        self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc")
        self.ipe_config, ret = sdk.lyn_ipe_create_config_desc()
        common.error_check(ret, "lyn_ipe_create_config_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_reset_config_desc")

        # 获取模型信息
        self.model_infos = []
        self.model_nums = 0
        for info in model_config:
            self.model_infos.append(ModelInfo(info['model_path'], info['plugin_path'], info['model_name']))
            self.model_nums += 1

        
        self.batch_size = self.model_infos[0].batch_size
        self.model_width = self.model_infos[0].model_width
        self.model_height = self.model_infos[0].model_height
        self.apu_output_size = self.model_infos[0].apu_output_size

        print(f'self.batch_size = {self.batch_size}')
        print(f'self.model_width = {self.model_width}')
        print(f'self.model_height = {self.model_height}')
        print(f'self.apu_output_size = {self.apu_output_size}')

        self.ipe_output_size = (
                self.model_width
                * self.model_height
                * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3]
        )
        
        (
            self.resize_width,
            self.resize_height,
            self.pad_x,
            self.pad_y,
        ) = set_padding_data(
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.model_width,
            self.model_height,
        )
        self.attr.width = self.vdec_out_info.width
        self.attr.height = self.vdec_out_info.height
        
        # 创建对象池
        self.ipe_output_mem_pool = bufferpool.buffer_pool(
            self.ipe_output_size * self.batch_size, 5
        )

        self.venc_recv_pool = bufferpool.buffer_pool(
            self.vdec_out_info.predict_buf_size, 5
        )
        print(f'self.ipe_output_size = {self.ipe_output_size}')
        print(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}')

        # 设置编码参数
        self.venc_attr = sdk.lyn_venc_attr_t()
        ret = sdk.lyn_venc_set_default_params(self.venc_attr)
        common.error_check(ret, "lyn_venc_set_default_params")
        self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264
        self.venc_attr.width = self.vdec_out_info.width
        self.venc_attr.height = self.vdec_out_info.height
        self.venc_attr.bit_depth = 8
        self.venc_attr.bframes_num = 0
        self.venc_attr.pframes_num = 5
        self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12
        self.venc_attr.target_bitrate = 6000000
        self.venc_attr.level = -1
        self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr)
        common.error_check(ret, "lyn_vdec_open")

        self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))


    def send_thread_func(self, cancel_flag):
        # 设置上下文环境 创建发送stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            # 从解封装器读取一个包
            pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl)
            eos = pkt.eos
            if ret != 0 and not eos:
                sdk.lyn_demux_close(self.__demux_hdl)
                time.sleep(500.0 / 1000)
                print("demux failed, reconnecting...")
                self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)
                continue

            # 发送给解码器解码
            ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt)
            common.error_check(ret, "lyn_vdec_send_packet_async")
            ret = sdk.lyn_synchronize_stream(self.send_stream)
            common.error_check(ret, "lyn_synchronize_stream")

            # 释放packet内存并通知接收结果
            if not eos:
                sdk.lyn_demux_free_packet(pkt)
                self.__send_num += 1
                self.__send_queue.put(self.__send_num)
            else:
                self.__send_queue.put(-1)
            if cancel_flag.value:
                return

    def recv_thread_func(self, cancel_flag):
        # 设置上下文环境 创建接收stream
        sdk.lyn_set_current_context(self.__ctx)
        frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5)
        while self.__recv_num >= 0:
            self.__recv_num = self.__send_queue.take()

            cb_data = recv_cb_data()
            cb_data.frame.eos = self.__recv_num < 0
            cb_data.frame.data = frame_pool.pop()
            cb_data.frame.size = self.vdec_out_info.predict_buf_size
            cb_data.frame_pool = frame_pool
            cb_data.send_num = self.__send_num
            cb_data.recv_num = self.__recv_num
            cb_data.attr = self.attr
            cb_data.block_queue = self.__ipe_queue
            cb_data.video_frame = self.video_frame
            # 插入接收指令,并添加接收完成回调函数
            ret = sdk.lyn_vdec_recv_frame_async(
                self.recv_stream, self.__vdec_hdl, cb_data.frame
            )
            common.error_check(ret, "lyn_vdec_recv_frame_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.recv_stream, recv_frame_cb, cb_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")
            if cancel_flag.value:
                return

    def ipe_thread_func(self, cancel_flag):
        # 设置上下文环境 创建ipe stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            cb_data = self.__ipe_queue.take()
            eos = cb_data.frame.eos
            ipe_out_data = self.ipe_process(cb_data)

            self.model_process(ipe_out_data, cb_data)
            # apu_output_data = self.apu_process(ipe_out_data)
            # self.plugin_process(apu_output_data, cb_data)

            # self.encode_process(cb_data)
            

            if cancel_flag.value:
                return

    def ipe_process(self, cb_data: framepool_cb_data):
        sdk.lyn_set_current_context(self.__ctx)
        ipe_out_data = self.ipe_output_mem_pool.pop()
        # 设置ipe输入
        ret = sdk.lyn_ipe_set_input_pic_desc(
            self.ipe_input_desc,
            cb_data.frame.data,
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.__vdec_attr.output_fmt,
        )
        common.error_check(ret, "lyn_ipe_set_input_pic_desc")
        ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data)
        common.error_check(ret, "lyn_ipe_set_output_pic_data")
        ret = sdk.lyn_ipe_set_resize_config(
            self.ipe_config, self.resize_width, self.resize_height
        )
        common.error_check(ret, "lyn_ipe_set_resize_config")
        ret = sdk.lyn_ipe_set_pad_config(
            self.ipe_config,
            self.pad_y,
            self.pad_x,
            self.pad_y,
            self.pad_x,
            114,
            114,
            114,
        )
        common.error_check(ret, "lyn_ipe_set_pad_config")
        ret = sdk.lyn_ipe_set_c2c_config(
            self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0
        )
        common.error_check(ret, "lyn_ipe_set_c2c_config")
        ret = sdk.lyn_ipe_cal_output_pic_desc(
            self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0
        )
        common.error_check(ret, "lyn_ipe_cal_output_pic_desc")
        ret = sdk.lyn_ipe_process_async(
            self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config
        )
        common.error_check(ret, "lyn_ipe_process_async")

        return ipe_out_data

    def get_channel_info(self) -> str:
        return f'{self.attr.device_id}_{self.attr.chan_id}'

    
    def model_process(self, ipe_out_data, cb_data):
        for model_info in self.model_infos:
            apu_output_data = self.apu_process(model_info, ipe_out_data)
            self.plugin_process(model_info, apu_output_data, cb_data)
            # self.encode_process(model_info, cb_data)

        common.print_frame_rate(self.get_channel_info())
        
        # self.merge_plugin_process(cb_data)
        self.encode_process(cb_data)

        
        

        ret = sdk.lyn_stream_add_async_callback(
            self.model_infos[-1].apu_stream,
            free_to_pool_callback,
            [self.ipe_output_mem_pool, ipe_out_data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        # ret = sdk.lyn_stream_add_async_callback(
        #         self.model_infos[-1].plugin_stream,
        #         free_to_pool_callback,
        #         [cb_data.frame_pool, cb_data.frame.data],
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")

    def apu_process(self, model_info, ipe_out_data):
        # 等待IPE处理完成
        ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event)
        common.error_check(ret, "lyn_stream_wait_event")
        apu_output_data = model_info.apu_output_mem_pool.pop()
        ret = sdk.lyn_execute_model_async(
            model_info.apu_stream,
            model_info.model,
            ipe_out_data,
            apu_output_data,
            self.batch_size,
        )
        common.error_check(ret, "lyn_execute_model_async")
        
        
        return apu_output_data

    def plugin_process(self,model_info, apu_output_data, cb_data):
        # 等待apu处理完成
        ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event)
        common.error_check(ret, "lyn_stream_wait_event")
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)

        post_para = struct.pack(
            '6IH2f?2P',
            self.codec_para.width,
            self.codec_para.height,
            self.model_width,
            self.model_height,
            model_info.class_num,
            500,  # nmsTopK
            model_info.anchor_size,
            0.25,  # score threashold
            0.45,  # nms threshold
            True,  # is pad resize
            apu_data_ptr,
            boxes_info_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #         model_info.plugin_stream,
        #         show_boxinfo_cb,
        #         [model_info.model_name, model_info.boxes_info],
        #     )

        ret = sdk.lyn_stream_add_async_callback(
            model_info.plugin_stream,
            free_to_pool_callback,
            [model_info.apu_output_mem_pool, apu_output_data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")
    
    def merge_plugin_process(self, cb_data):
        plugin = self.model_infos[-1].plugin
        plugin_stream = self.model_infos[-1].plugin_stream

        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]

        target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        sources_ptrs = [
            pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos
        ]
        while len(sources_ptrs) < 4:
            sources_ptrs.append(0)
        
        box_merge_params = struct.pack(
            "P4PH",                # 格式:目标指针 + 4 个源指针 + 源数量
            target_ptr,             # target 的地址
            *sources_ptrs,          # sources 的地址列表
            len(self.model_infos)   # 源数量
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #     plugin_stream,
        #     show_boxinfo_cb,
        #     ['all', self.total_boxes_info],
        # )
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream,
            plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

    def encode_process(self, cb_data: cb_data):
        # 在 plugin 处理完之后,进行编码操作
        ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event)
        common.error_check(ret, "lyn_stream_wait_event")
        if self.enc_head_flag:  # 第一帧编码
            self.enc_head_flag = False
            enc_packet = sdk.lyn_packet_t()
            enc_packet.size = self.vdec_out_info.predict_buf_size
            enc_packet.data = self.venc_recv_pool.pop()
            encode_data = save_file_cb_data()
            encode_data.output_path = self.attr.output_path
            encode_data.packet = enc_packet
            encode_data.recv_pool = self.venc_recv_pool
            encode_data.file_path = self.attr.output_path
            encode_data.video_frame = self.attr.video_frame
            
            ret = sdk.lyn_venc_get_paramsset_async(
                self.venc_recv_stream, self.venc_handle, enc_packet
            )
            common.error_check(ret, "lyn_venc_get_paramsset_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.venc_recv_stream, save_file_cb, encode_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_venc_sendframe_async(
            self.venc_send_stream, self.venc_handle, cb_data.frame
        )
        common.error_check(ret, "lyn_venc_sendframe_async")

        enc_packet = sdk.lyn_packet_t()
        enc_packet.size = self.vdec_out_info.predict_buf_size
        enc_packet.eos = cb_data.frame.eos
        enc_packet.data = self.venc_recv_pool.pop()
        encode_data = save_file_cb_data()
        encode_data.packet = enc_packet
        encode_data.recv_pool = self.venc_recv_pool
        encode_data.file_path = self.attr.output_path
        encode_data.output_path = self.attr.output_path
        encode_data.video_frame = self.attr.video_frame
        ret = sdk.lyn_stream_add_async_callback(
            self.venc_send_stream,
            free_to_pool_callback,
            [cb_data.frame_pool, cb_data.frame.data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")
        ret = sdk.lyn_venc_recvpacket_async(
            self.venc_recv_stream, self.venc_handle, enc_packet
        )
        common.error_check(ret, "lyn_venc_recvpacket_async")
        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream, save_file_cb, encode_data
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

    def app_process(self, cb_data: cb_data):
        ret = sdk.lyn_record_event(self.plugin_stream, self.plugin_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(self.app_stream, self.plugin_event)
        common.error_check(ret, "lyn_stream_wait_event")
        if self.attr.show_type == 0:
            ret = sdk.lyn_stream_add_async_callback(
                self.plugin_stream, show_video_cb, cb_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")
        elif self.attr.show_type == 1:
            self.encode_process(cb_data)

    def run(self, cancel_flag):
        # 开启发送线程
        self.__send_thread = threading.Thread(
            target=self.send_thread_func, args=(cancel_flag,)
        )
        self.__send_thread.start()

        # 开启接收线程
        self.__recv_thread = threading.Thread(
            target=self.recv_thread_func, args=(cancel_flag,)
        )
        self.__recv_thread.start()

        # 开启图像处理和推理线程
        self.__ipe_thread = threading.Thread(
            target=self.ipe_thread_func, args=(cancel_flag,)
        )
        self.__ipe_thread.start()

    def close(self):
        if self.__send_thread.is_alive():
            self.__send_thread.join()
        if self.__recv_thread.is_alive():
            self.__recv_thread.join()
        if self.__ipe_thread.is_alive():
            self.__ipe_thread.join()

        ret = sdk.lyn_synchronize_stream(self.send_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.ipe_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.apu_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.plugin_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_synchronize_stream")

        ret = sdk.lyn_destroy_stream(self.send_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.ipe_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.apu_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.plugin_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_destroy_stream")

        ret = sdk.lyn_destroy_event(self.ipe_event)
        common.error_check(ret, "lyn_destroy_event")
        ret = sdk.lyn_destroy_event(self.apu_event)
        common.error_check(ret, "lyn_destroy_event")

        # destory ipe desc and config
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_destroy_config_desc")
        ret = sdk.lyn_plugin_unregister(self.plugin)
        common.error_check(ret, "lyn_plugin_unregister")

        # 卸载模型
        
        for model_info in self.model_infos:
            ret = sdk.lyn_unload_model(model_info.model)
            common.error_check(ret, "lyn_unload_model")

        if self.__vdec_hdl != "":
            ret = sdk.lyn_vdec_close(self.__vdec_hdl)
            common.error_check(ret, "lyn_vdec_close")

        if self.__ctx != "":
            ret = sdk.lyn_destroy_context(self.__ctx)
            common.error_check(ret, "lyn_destroy_context")


def main(args, video_frame, device_id: int, channel_id: int) -> None:
    attr = infer_process_attr()
    attr.url = args.input_path
    # attr.output_path = args.output_path
    attr.device_id = device_id
    attr.chan_id = channel_id
    attr.plugin_path = args.plugin_path
    attr.model_path = args.model_path
    attr.video_frame = video_frame
    if not attr.url:
        raise ValueError('input file miss!!!')
    # if not attr.output_path:
    #     raise ValueError('unspecified output path!!!')
    decoder = yolov5(attr)
    decoder.run(cancel_flag)
    decoder.close()


if __name__ == "__main__":
    signal.signal(signal.SIGINT, cancel_process)

    parser = argparse.ArgumentParser(description="RTSP stream input and output paths")
    parser.add_argument('--input_path', type=str, required=True, help='Input path (e.g., RTSP or file path)')
    parser.add_argument('--output_path', type=str, required=True, help='Output path (e.g., RTSP output path)')
    args = parser.parse_args()

    # args = SimpleNamespace()
    # args.input_path = 'rtsp://192.168.10.137:554/11'
    # # args.input_path = '/usr/local/lynxi/sdk/sdk-samples/data/MOT16-09.mp4'
    # args.output_path = 'rtsp://192.168.1.100:8554/live?rtptransport=udp'
    args.plugin_path = 'plugins/libYolov5Plugin.so'
    args.model_path = 'models/yolov5s/Net_0'

    init_output_process(args.output_path)

    device_id = 0
    channel_id = 1
    video_frame = queue.Queue(10)
    channel_thread = threading.Thread(target=main, args=(args, video_frame, device_id, channel_id))
    channel_thread.start()

    push_video(video_frame, cancel_flag)