Newer
Older
lynxi-casic-demo / app.py
import base64
from blockqueue import block_queue
from callback_data_struct import *
from info_query import alarm_upload
import common
import bufferpool

import math
import queue
import sys, os
from types import SimpleNamespace
from typing import List, Tuple
import time
import numpy as np
import pylynchipsdk as sdk
import copy
import threading
import multiprocessing
import signal
import sys
import json
import ctypes
from ctypes import *
import struct

import subprocess
import argparse

# import ffmpeg
try:
    import cv2
except ImportError:
    cv2 = None


def cancel_process(signum, frame):
    global cancel_flag
    cancel_flag.value = True


cancel_flag = multiprocessing.Value('b', False)

last_saved_time = {}
last_push_time = {} # key: output_url, value: time
last_alarm_time = {} # key: device_no, value: model_code-objcet_code: time

class bbox(ctypes.Structure):
    _fields_ = [
        ("xmin", ctypes.c_uint32),
        ("ymin", ctypes.c_uint32),
        ("xmax", ctypes.c_uint32),
        ("ymax", ctypes.c_uint32),
        ("score", ctypes.c_float),
        ("id", ctypes.c_uint32),
        ("label", ctypes.c_wchar * 64),
        ("alarm", ctypes.c_uint8),
        ("model_code", ctypes.c_wchar * 64),
    ]

class Box(ctypes.Structure):
    _fields_ = [
        ("boxesnum", ctypes.c_uint32), 
        ("boxes", ctypes.ARRAY(bbox, 256))
    ]

class SubClass(ctypes.Structure):
    _fields_ = [
        ("model_code", ctypes.c_wchar * 64),
        ("object_id",ctypes.c_uint32),
        ("object_name", ctypes.c_wchar * 64),
        ("conf_threshold", ctypes.c_float),
        ("alarm_threshold", ctypes.c_uint32),
        ("range", ctypes.ARRAY(ctypes.c_float, 8))
    ]

class SubObject(ctypes.Structure):
    _fields_ = [
        ("objectsnum", ctypes.c_uint32), 
        ("objects", ctypes.ARRAY(SubClass, 80))
    ]

class ModelHandler:
    def __init__(self, model_path, plugin_path, model_name,model_code, handle_task, objects, image_width, image_height) -> None:
        self.model_path = model_path
        self.plugin_path = plugin_path
        self.model_name = model_name
        self.model_code = model_code
        self.handle_task = handle_task
        self.objects = objects
        print(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}')

        self.model, ret = sdk.lyn_load_model(self.model_path)
        common.error_check(ret, "lyn_load_model")

        self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path)
        common.error_check(ret, "lyn_plugin_register")

        self.apu_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.plugin_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.apu_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.model_desc, ret = sdk.lyn_model_get_desc(self.model)
        common.error_check(ret, "lyn_model_get_desc")
        self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize
        self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2]
        self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1]

        self.apu_output_size = self.model_desc.outputDataLen

        self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5
        self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1]
        self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))

        self.sub_objects_info, ret = sdk.lyn_malloc(ctypes.sizeof(SubObject))

        sub_obj = sdk.c_malloc(ctypes.sizeof(SubObject))

        # 获取底层指针
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        raw_ptr = ctypes.pythonapi.PyCapsule_GetPointer(sub_obj, None)
        if not raw_ptr:
            raise ValueError("Failed to extract pointer from PyCapsule")

        # 转换为 ctypes.POINTER(SubObject)
        sub_obj_ptr = ctypes.cast(raw_ptr, ctypes.POINTER(SubObject))

        # 通过 ctypes.POINTER 操作数据
        sub_obj_instance = sub_obj_ptr.contents
        sub_obj_instance.objectsnum = len(self.objects)

        for i in range(sub_obj_instance.objectsnum):
            obj = self.objects[i]
            sub_obj_instance.objects[i].object_id = int(obj.object_code)
            sub_obj_instance.objects[i].object_name = obj.object_name
            sub_obj_instance.objects[i].conf_threshold = obj.conf_threshold
            sub_obj_instance.objects[i].alarm_threshold = obj.alarm_threshold
            sub_obj_instance.objects[i].model_code = obj.model_code
            if obj.range:
                range_values = [float(r) for r in obj.range]
                abs_range = []
                for idx, value in enumerate(range_values):
                    if idx % 2 == 0:  # x 坐标 (索引 0, 2, 4, 6)
                        abs_range.append(value * image_width)
                    else:  # y 坐标 (索引 1, 3, 5, 7)
                        abs_range.append(value * image_height)
                print(abs_range)
                sub_obj_instance.objects[i].range = (c_float * 8)(*abs_range)
            else:
                sub_obj_instance.objects[i].range = (0, 0, 0, 0,0,0,0,0)

        ret = sdk.lyn_memcpy(
            self.sub_objects_info, sub_obj, ctypes.sizeof(SubObject), sdk.lyn_memcpy_dir_t.ClientToServer
        )
        common.error_check(ret, "save_boxinfo_cb lyn_memcpy")

        self.apu_output_mem_pool = bufferpool.buffer_pool(
            self.apu_output_size * self.batch_size, 5
        )
        
        print(f'batch_size = {self.batch_size}')
        print(f'model_width = {self.model_width}')
        print(f'model_height = {self.model_height}')
        print(f'apu_output_size = {self.apu_output_size}')
        print(f'class_num = {self.class_num}')
        print(f'anchor_size = {self.anchor_size}')


def recv_frame_cb(cb_data: cb_data):
    # 放到ipe的处理队列中
    cb_data.block_queue.put(cb_data)
    return 0


def free_to_pool_callback(params):
    vdec_out_pool = params[0]
    data = params[1]
    vdec_out_pool.push(data)
    return 0


def set_even(num):
    num = math.floor(num)
    if num % 2 != 0:
        res = num - 1
    else:
        res = num
    return res


def set_padding_data(vidoe_width, video_height, model_width, model_height):
    if vidoe_width > video_height:
        resize_width = model_width
        pad_x = 0
        resize_height = set_even(int(video_height * model_width / vidoe_width))
        pad_y = (model_height - resize_height) / 2
    else:
        resize_height = model_height
        pad_y = 0
        resize_width = set_even(int(vidoe_width * model_width / video_height))
        pad_x = (model_width - resize_width) / 2

    return int(resize_width), int(resize_height), int(pad_x), int(pad_y)


def show_video_cb(params):
    cb_data: cb_data = params[0]
    model_infos = params[1]

    device_no = cb_data.attr.device_no
    alarm_interval = cb_data.attr.alarm_interval

    if(alarm_interval <= 0):
        return 0

    alarm_infos = []

    for model_info in model_infos:
        boxes_info = model_info.boxes_info
        model_code = model_info.model_code

        dst_img_size = ctypes.sizeof(Box)
        host_buf_arr = np.ones(dst_img_size, dtype=np.uint8)
        host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr)
        ret = sdk.lyn_memcpy(
            host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient
        )
        common.error_check(ret, "save_boxinfo_cb lyn_memcpy")
        host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None)
        c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents
        # box_dict = {"boxesnum": c_box.boxesnum, "boxes": []}
        # print(c_box.boxesnum)
        for i in range(c_box.boxesnum):
            bbox_instance = c_box.boxes[i]
            bbox_dict = {
                "xmin": bbox_instance.xmin,
                "ymin": bbox_instance.ymin,
                "xmax": bbox_instance.xmax,
                "ymax": bbox_instance.ymax,
                "score": bbox_instance.score,
                "id": bbox_instance.id,
                "label": bbox_instance.label,
                "alarm": bbox_instance.alarm,
                "model_code": bbox_instance.model_code,
            }
            
            # print(bbox_dict)
            
            if bbox_dict["alarm"]:
                last_time = last_alarm_time.get(device_no, {}).get(f'{model_code}_{bbox_dict["id"]}')
                if last_time is None or (time.time() - last_time > alarm_interval):
                    if any(item["algoModelCode"] == bbox_dict["model_code"] 
                           and item["recognitionTypeCode"] == bbox_dict["id"] 
                           for item in alarm_infos):
                        continue

                    alarm_info = {
                        "alarmTime": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
                        "algoModelCode": bbox_dict["model_code"],
                        "deviceNo": device_no,
                        "recognitionTypeCode": bbox_dict["id"],
                    }
                    # 更新 last_alarm_time 中的时间
                    if device_no not in last_alarm_time:
                        last_alarm_time[device_no] = {}  # 初始化设备号的嵌套字典
                    last_alarm_time[device_no][f'{model_code}_{bbox_dict["id"]}'] = time.time()
                    alarm_infos.append(alarm_info)
    
    if alarm_infos:
        data = np.empty(cb_data.frame.size, dtype=np.uint8)
        data_ptr = sdk.lyn_numpy_to_ptr(data)
        ret = sdk.lyn_memcpy(
            data_ptr,
            cb_data.frame.data,
            cb_data.frame.size,
            sdk.lyn_memcpy_dir_t.ServerToClient,
        )
        common.error_check(ret, "lyn_memcpy")
        yuvImg = np.reshape(
            data, (cb_data.attr.height * 3 // 2, cb_data.attr.width)
        ).astype(np.uint8)
        import cv2
        rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12)

        # 编码为 JPEG 格式
        _, buffer = cv2.imencode('.jpg', rgbImg)
        # 转换为 Base64 并添加头部
        image_base64 = f"data:image/jpeg;base64,{base64.b64encode(buffer).decode('utf-8')}"


        save_dir = "./saved_images"
        device_dir = os.path.join(save_dir, device_no)
        os.makedirs(device_dir, exist_ok=True)
        # 生成存储的文件名
        timestamp = time.strftime('%Y%m%d_%H%M%S', time.localtime())
        file_path = os.path.join(device_dir, f"{timestamp}.jpg")

        cv2.imwrite(file_path, rgbImg)

        for alarm_info in alarm_infos:
            print(alarm_info)
            alarm_info["picBase64"] = image_base64
            alarm_upload(alarm_info)
            
    return 0

def save_file_cb(params):
    cb_data: save_file_cb_data = params[0]
    frame_cb_data: cb_data = params[1]

    packet = cb_data.packet
    data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet)
    common.error_check(ret, "lyn_enc_get_remote_packet_valid_size")
    data = np.zeros(data_size, dtype=np.byte)
    data_ptr = sdk.lyn_numpy_to_ptr(data)
    ret = sdk.lyn_memcpy(
        data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "lyn_memcpy")
    
    cb_data.video_frame.put([data, packet.eos])
    cb_data.recv_pool.push(packet.data)
    return ret



def show_boxinfo_cb(params):
    model_name = params[0]
    boxes_info = params[1]
    dst_img_size = ctypes.sizeof(Box)
    host_buf_arr = np.ones(dst_img_size, dtype=np.uint8)
    host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr)
    ret = sdk.lyn_memcpy(
        host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "save_boxinfo_cb lyn_memcpy")
    host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None)
    c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents
    dump_box_json(model_name, c_box)
    return 0


def dump_box_json(model_name, c_box):
    box_dict = {"boxesnum": c_box.boxesnum, "boxes": []}
    for i in range(c_box.boxesnum):
        bbox_instance = c_box.boxes[i]
        bbox_dict = {
            "xmin": bbox_instance.xmin,
            "ymin": bbox_instance.ymin,
            "xmax": bbox_instance.xmax,
            "ymax": bbox_instance.ymax,
            "score": bbox_instance.score,
            "id": bbox_instance.id,
            "label": bbox_instance.label,
            "alarm": bbox_instance.alarm,
            "model_code": bbox_instance.model_code,
        }
        box_dict["boxes"].append(bbox_dict)
    print(box_dict)

class yolov5:
    def __init__(self, attr: device_process_attr) -> None:
        self.attr = ""
        self.__ctx = ""
        self.__send_thread = ""
        self.__recv_thread = ""
        self.__ipe_thread = ""
        self.__demux_hdl = ""
        self.codec_para = ""
        self.__vdec_hdl = ""
        self.__vdec_attr = sdk.lyn_vdec_attr_t()
        self.__send_queue = block_queue()
        self.__ipe_queue = block_queue()
        self.__send_num = 0
        self.__recv_num = 0
        # 创建上下文环境
        self.attr = copy.copy(attr)
        self.video_frame = attr.video_frame
        self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id)
        self.enc_head_flag = True
        common.error_check(ret, "create context")
        ret = sdk.lyn_register_error_handler(common.default_stream_error_handle)
        common.error_check(ret, 'lyn_register_error_handler')

        # 打开解封装器
        self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)
        common.error_check(ret, "lyn_demux_open ")
        self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_codec_para ")
        self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_framerate")
        print(f'fps = {self.fps}')

        # 打开解码器
        self.__vdec_attr.codec_id = self.codec_para.codec_id
        self.__vdec_attr.output_fmt = self.attr.output_fmt
        self.__vdec_attr.scale = self.attr.scale
        self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr)
        common.error_check(ret, "lyn_vdec_open ")
        self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info(
            self.codec_para, self.__vdec_attr
        )
        print(f'self.vdec_out_info.width = {self.vdec_out_info.width}')
        print(f'self.vdec_out_info.height = {self.vdec_out_info.height}')
        common.error_check(ret, "lyn_vdec_get_out_info ")
        self.attr.width = self.vdec_out_info.width
        self.attr.height = self.vdec_out_info.height

        # 创建stream、event和ipe desc
        self.send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.ipe_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        # self.apu_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        # self.plugin_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        self.venc_recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.venc_send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.ipe_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")
        # self.apu_event, ret = sdk.lyn_create_event()
        # common.error_check(ret, "lyn_create_event")
        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc")
        self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc")
        self.ipe_config, ret = sdk.lyn_ipe_create_config_desc()
        common.error_check(ret, "lyn_ipe_create_config_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_reset_config_desc")

        # 获取模型信息
        self.model_infos = []
        self.model_nums = 0
        for info in self.attr.model_configs:
            self.model_infos.append(ModelHandler(f'{info.model_path}/Net_0', 
            f'{info.model_path}/plugin.so', 
            info.model_name,
            info.model_code,
            info.handle_task,
            info.objects,
            self.attr.width,
            self.attr.height))
            self.model_nums += 1

        
        self.batch_size = self.model_infos[0].batch_size
        self.model_width = self.model_infos[0].model_width
        self.model_height = self.model_infos[0].model_height
        self.apu_output_size = self.model_infos[0].apu_output_size

        print(f'self.batch_size = {self.batch_size}')
        print(f'self.model_width = {self.model_width}')
        print(f'self.model_height = {self.model_height}')
        print(f'self.apu_output_size = {self.apu_output_size}')

        self.ipe_output_size = (
                self.model_width
                * self.model_height
                * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3]
        )
        
        (
            self.resize_width,
            self.resize_height,
            self.pad_x,
            self.pad_y,
        ) = set_padding_data(
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.model_width,
            self.model_height,
        )

        
        # 创建对象池
        self.ipe_output_mem_pool = bufferpool.buffer_pool(
            self.ipe_output_size * self.batch_size, 5
        )

        self.venc_recv_pool = bufferpool.buffer_pool(
            self.vdec_out_info.predict_buf_size, 5
        )
        print(f'self.ipe_output_size = {self.ipe_output_size}')
        print(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}')

        # 设置编码参数
        self.venc_attr = sdk.lyn_venc_attr_t()
        ret = sdk.lyn_venc_set_default_params(self.venc_attr)
        common.error_check(ret, "lyn_venc_set_default_params")
        self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264
        self.venc_attr.width = self.vdec_out_info.width
        self.venc_attr.height = self.vdec_out_info.height
        self.venc_attr.bit_depth = 8
        self.venc_attr.bframes_num = 0
        self.venc_attr.pframes_num = 5
        self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12
        self.venc_attr.target_bitrate = 6000000
        self.venc_attr.level = -1
        self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr)
        common.error_check(ret, "lyn_vdec_open")

        self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))
        self.start_process_time = None
        self.stream_out_process = None
        self.init_output_process()

        
        self.frame_step = 3
        self.frame_idx = frame_step - 1 # 为了处理第0帧,这里不从0开始计数,而是从step-1开始

    
    def init_output_process(self):
        command = ['ffmpeg',
                '-i', '-',
                "-c:v", "copy",
                '-f', 'rtsp',
                '-rtsp_transport', 'tcp',
                self.attr.output_path]

        # 启动FFmpeg子进程
        try:
            self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        except OSError as e:
            print(f"Failed to start process: {e}")
        except subprocess.SubprocessError as e:
            print(f"Subprocess error: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

        # stream_out_process = (
        # ffmpeg
        # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25)
        # .output(output_path, vcodec='libx264',
        #         preset='ultrafast', tune='zerolatency',
        #         bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10)
        # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report'])
        # )

        def read_stderr(process):
            for line in iter(process.stderr.readline, b''):
                print(f"stderr: {line.decode('utf-8')}", end='')

        if self.stream_out_process:
            stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,))
            stderr_thread.daemon = True
            stderr_thread.start()


    def push_video(self):
        # show window
        global cancel_flag
        global last_push_time

        frame_rate = self.fps  # 目标帧率
        frame_interval = 1 / frame_rate  # 每帧间隔时间
        last_push_time.setdefault(self.attr.output_path, time.time())

        while True:
            try:
                frame = self.video_frame.get(False)
            except:
                if cancel_flag.value:
                    sys.exit()
                else:
                    time.sleep(0.01)
                    continue
            if frame[1] or cancel_flag.value:
                # cv2.destroyAllWindows()
                sys.exit()

            if frame and self.stream_out_process:
                image = frame[0]
                # resized_image = cv2.resize(image, (640, 360))

                self.stream_out_process.stdin.write(image.tobytes())
                # elapsed_time = time.time() - self.start_process_time
                elapsed_time = time.time() - last_push_time[self.attr.output_path]
                sleep_time = frame_interval - elapsed_time - 0.01
                # print(f'elapsed_time = {elapsed_time*1000:.3f}ms, sleep_time = {sleep_time*1000:.3f}ms')
                time.sleep(max(0, sleep_time))
                last_push_time[self.attr.output_path] = time.time()


    def send_thread_func(self, cancel_flag):
        # 设置上下文环境 创建发送stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            # 从解封装器读取一个包
            pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl)
            eos = pkt.eos
            if ret != 0 and not eos:
                sdk.lyn_demux_close(self.__demux_hdl)
                time.sleep(500.0 / 1000)
                print("demux failed, reconnecting...")
                self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)

                if not ret:
                    self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
                    common.error_check(ret, "lyn_synchronize_stream")
                    print(f'fps = {self.fps}')
                continue

            # 发送给解码器解码
            ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt)
            common.error_check(ret, "lyn_vdec_send_packet_async")
            ret = sdk.lyn_synchronize_stream(self.send_stream)
            common.error_check(ret, "lyn_synchronize_stream")

            # 释放packet内存并通知接收结果
            if not eos:
                sdk.lyn_demux_free_packet(pkt)
                self.__send_num += 1
                self.__send_queue.put(self.__send_num)
            else:
                self.__send_queue.put(-1)
            if cancel_flag.value:
                return

    def recv_thread_func(self, cancel_flag):
        # 设置上下文环境 创建接收stream
        sdk.lyn_set_current_context(self.__ctx)
        frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5)
        while self.__recv_num >= 0:
            self.__recv_num = self.__send_queue.take()

            cb_data = recv_cb_data()
            cb_data.frame.eos = self.__recv_num < 0
            cb_data.frame.data = frame_pool.pop()
            cb_data.frame.size = self.vdec_out_info.predict_buf_size
            cb_data.frame_pool = frame_pool
            cb_data.send_num = self.__send_num
            cb_data.recv_num = self.__recv_num
            cb_data.attr = self.attr
            cb_data.block_queue = self.__ipe_queue
            cb_data.video_frame = self.video_frame
            # 插入接收指令,并添加接收完成回调函数
            ret = sdk.lyn_vdec_recv_frame_async(
                self.recv_stream, self.__vdec_hdl, cb_data.frame
            )
            common.error_check(ret, "lyn_vdec_recv_frame_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.recv_stream, recv_frame_cb, cb_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")
            if cancel_flag.value:
                return

    def ipe_thread_func(self, cancel_flag):
        # 设置上下文环境 创建ipe stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            cb_data = self.__ipe_queue.take()
            eos = cb_data.frame.eos
            self.start_process_time = time.time()
            ipe_out_data = self.ipe_process(cb_data)

            self.model_process(ipe_out_data, cb_data)
            if cancel_flag.value:
                return

    def ipe_process(self, cb_data: framepool_cb_data):
        sdk.lyn_set_current_context(self.__ctx)
        ipe_out_data = self.ipe_output_mem_pool.pop()
        # 设置ipe输入
        ret = sdk.lyn_ipe_set_input_pic_desc(
            self.ipe_input_desc,
            cb_data.frame.data,
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.__vdec_attr.output_fmt,
        )
        common.error_check(ret, "lyn_ipe_set_input_pic_desc")
        ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data)
        common.error_check(ret, "lyn_ipe_set_output_pic_data")
        ret = sdk.lyn_ipe_set_resize_config(
            self.ipe_config, self.resize_width, self.resize_height
        )
        common.error_check(ret, "lyn_ipe_set_resize_config")
        ret = sdk.lyn_ipe_set_pad_config(
            self.ipe_config,
            self.pad_y,
            self.pad_x,
            self.pad_y,
            self.pad_x,
            114,
            114,
            114,
        )
        common.error_check(ret, "lyn_ipe_set_pad_config")
        ret = sdk.lyn_ipe_set_c2c_config(
            self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0
        )
        common.error_check(ret, "lyn_ipe_set_c2c_config")
        ret = sdk.lyn_ipe_cal_output_pic_desc(
            self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0
        )
        common.error_check(ret, "lyn_ipe_cal_output_pic_desc")
        ret = sdk.lyn_ipe_process_async(
            self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config
        )
        common.error_check(ret, "lyn_ipe_process_async")

        # ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        # common.error_check(ret, "lyn_record_event")

        return ipe_out_data

    def get_channel_info(self) -> str:
        return f'{self.attr.device_id}_{self.attr.output_path}'

    
    def model_process(self, ipe_out_data, cb_data):
        # self.frame_idx = (frame_idx + 1) % self.frame_step
        # if self.frame_idx == 0:
        #     # 正常处理
        # else:
        #     # 跳过处理,直接用上次的box渲染 

        for model_info in self.model_infos:
            apu_output_data = self.apu_process(model_info, ipe_out_data)
            self.plugin_process(model_info, apu_output_data, cb_data)

        ret = sdk.lyn_stream_add_async_callback(
            self.model_infos[-1].plugin_stream,
            show_video_cb,
            [cb_data, self.model_infos],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        # self.merge_plugin_process(cb_data)
        self.encode_process(cb_data)

        common.print_frame_rate(self.get_channel_info())

        ret = sdk.lyn_stream_add_async_callback(
            self.model_infos[-1].apu_stream,
            free_to_pool_callback,
            [self.ipe_output_mem_pool, ipe_out_data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        # ret = sdk.lyn_stream_add_async_callback(
        #         self.model_infos[-1].plugin_stream,
        #         free_to_pool_callback,
        #         [cb_data.frame_pool, cb_data.frame.data],
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")

    def apu_process(self, model_info, ipe_out_data):
        # 等待IPE处理完成
        ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event)
        common.error_check(ret, "lyn_stream_wait_event")
        apu_output_data = model_info.apu_output_mem_pool.pop()
        ret = sdk.lyn_execute_model_async(
            model_info.apu_stream,
            model_info.model,
            ipe_out_data,
            apu_output_data,
            self.batch_size,
        )
        common.error_check(ret, "lyn_execute_model_async")
        return apu_output_data

    def plugin_process(self,model_info, apu_output_data, cb_data):
        # 等待apu处理完成
        ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event)
        common.error_check(ret, "lyn_stream_wait_event")
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)
        sub_objects_info_ptr =  pythonapi.PyCapsule_GetPointer(model_info.sub_objects_info, None)

        post_para = struct.pack(
            '6IH2f?3P',
            self.codec_para.width,
            self.codec_para.height,
            self.model_width,
            self.model_height,
            model_info.class_num,
            500,  # nmsTopK
            model_info.anchor_size,
            0.25,  # score threashold
            0.45,  # nms threshold
            True,  # is pad resize
            apu_data_ptr,
            boxes_info_ptr,
            sub_objects_info_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #         model_info.plugin_stream,
        #         show_boxinfo_cb,
        #         [model_info.model_name, model_info.boxes_info],
        # )

        # ret = sdk.lyn_stream_add_async_callback(
        #     model_info.plugin_stream, show_video_cb, [cb_data,model_info.boxes_info,model_info.model_code]
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_stream_add_async_callback(
            model_info.plugin_stream,
            free_to_pool_callback,
            [model_info.apu_output_mem_pool, apu_output_data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")
    
    def plugin_draw_process(self.model_info, cb_data):
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")



    def merge_plugin_process(self, cb_data):
        plugin = self.model_infos[-1].plugin
        plugin_stream = self.model_infos[-1].plugin_stream

        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]

        target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        sources_ptrs = [
            pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos
        ]
        while len(sources_ptrs) < 4:
            sources_ptrs.append(0)
        
        box_merge_params = struct.pack(
            "P4PH",                # 格式:目标指针 + 4 个源指针 + 源数量
            target_ptr,             # target 的地址
            *sources_ptrs,          # sources 的地址列表
            len(self.model_infos)   # 源数量
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #     plugin_stream,
        #     show_boxinfo_cb,
        #     ['all', self.total_boxes_info],
        # )
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream,
            plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

        ret = sdk.lyn_stream_add_async_callback(
            plugin_stream, show_video_cb, [cb_data,self.total_boxes_info]
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

    def encode_process(self, cb_data: cb_data):
        # 在 plugin 处理完之后,进行编码操作
        ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event)
        common.error_check(ret, "lyn_stream_wait_event")
        if self.enc_head_flag:  # 第一帧编码
            self.enc_head_flag = False
            enc_packet = sdk.lyn_packet_t()
            enc_packet.size = self.vdec_out_info.predict_buf_size
            enc_packet.data = self.venc_recv_pool.pop()
            encode_data = save_file_cb_data()
            encode_data.output_path = self.attr.output_path
            encode_data.packet = enc_packet
            encode_data.recv_pool = self.venc_recv_pool
            encode_data.file_path = self.attr.output_path
            encode_data.video_frame = self.attr.video_frame
            
            ret = sdk.lyn_venc_get_paramsset_async(
                self.venc_recv_stream, self.venc_handle, enc_packet
            )
            common.error_check(ret, "lyn_venc_get_paramsset_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_venc_sendframe_async(
            self.venc_send_stream, self.venc_handle, cb_data.frame
        )
        common.error_check(ret, "lyn_venc_sendframe_async")

        enc_packet = sdk.lyn_packet_t()
        enc_packet.size = self.vdec_out_info.predict_buf_size
        enc_packet.eos = cb_data.frame.eos
        enc_packet.data = self.venc_recv_pool.pop()
        encode_data = save_file_cb_data()
        encode_data.packet = enc_packet
        encode_data.recv_pool = self.venc_recv_pool
        encode_data.file_path = self.attr.output_path
        encode_data.output_path = self.attr.output_path
        encode_data.video_frame = self.attr.video_frame
        # ret = sdk.lyn_stream_add_async_callback(
        #     self.venc_send_stream,
        #     free_to_pool_callback,
        #     [cb_data.frame_pool, cb_data.frame.data],
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")
        ret = sdk.lyn_venc_recvpacket_async(
            self.venc_recv_stream, self.venc_handle, enc_packet
        )
        common.error_check(ret, "lyn_venc_recvpacket_async")
        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream,
            free_to_pool_callback,
            [cb_data.frame_pool, cb_data.frame.data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

    def run(self):
        global cancel_flag
        # 开启发送线程
        self.__send_thread = threading.Thread(
            target=self.send_thread_func, args=(cancel_flag,)
        )
        self.__send_thread.start()

        # 开启接收线程
        self.__recv_thread = threading.Thread(
            target=self.recv_thread_func, args=(cancel_flag,)
        )
        self.__recv_thread.start()

        # 开启图像处理和推理线程
        self.__ipe_thread = threading.Thread(
            target=self.ipe_thread_func, args=(cancel_flag,)
        )
        self.__ipe_thread.start()

    def close(self):
        if self.__send_thread.is_alive():
            self.__send_thread.join()
        if self.__recv_thread.is_alive():
            self.__recv_thread.join()
        if self.__ipe_thread.is_alive():
            self.__ipe_thread.join()

        ret = sdk.lyn_synchronize_stream(self.send_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.ipe_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.apu_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.plugin_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_synchronize_stream")

        ret = sdk.lyn_destroy_stream(self.send_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.ipe_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.apu_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.plugin_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_destroy_stream")

        ret = sdk.lyn_destroy_event(self.ipe_event)
        common.error_check(ret, "lyn_destroy_event")
        ret = sdk.lyn_destroy_event(self.apu_event)
        common.error_check(ret, "lyn_destroy_event")

        # destory ipe desc and config
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_destroy_config_desc")
        ret = sdk.lyn_plugin_unregister(self.plugin)
        common.error_check(ret, "lyn_plugin_unregister")

        # 卸载模型
        
        for model_info in self.model_infos:
            ret = sdk.lyn_unload_model(model_info.model)
            common.error_check(ret, "lyn_unload_model")

        if self.__vdec_hdl != "":
            ret = sdk.lyn_vdec_close(self.__vdec_hdl)
            common.error_check(ret, "lyn_vdec_close")

        if self.__ctx != "":
            ret = sdk.lyn_destroy_context(self.__ctx)
            common.error_check(ret, "lyn_destroy_context")


def run_device(device_no:str, input_url: str, output_url:str, device_id: int, model_configs, alarm_interval, threads) -> None:
    attr = device_process_attr()
    attr.device_no = device_no
    attr.url = input_url
    attr.output_path = output_url
    attr.device_id = device_id
    attr.alarm_interval = alarm_interval
    attr.video_frame = queue.Queue(10)
    attr.model_configs = model_configs
    if not attr.url:
        raise ValueError('input file miss!!!')
    if not attr.output_path:
        raise ValueError('unspecified output path!!!')

    decoder = yolov5(attr)

    infer_thread = threading.Thread(target=decoder.run, args=())
    threads.append(infer_thread)
    infer_thread.start()

    out_thread = threading.Thread(target=decoder.push_video, args=())
    threads.append(out_thread)
    out_thread.start()

    # decoder.close()


if __name__ == "__main__":
    signal.signal(signal.SIGINT, cancel_process)

    devices = [
        {
            'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101',
            'output_url': 'rtmp://192.168.83.100:1935/live/101',
            'model_configs':  [
                {
                    'model_path': 'models/yolov5s/Net_0',
                    'plugin_path': 'plugins/libYolov5Plugin.so',
                    'model_name': 'yolov5s',
                },
                {
                    'model_path': 'models/yuntai/Net_0',
                    'plugin_path': 'plugins/libMultiPlugin.so',
                    'model_name': 'yuntai',
                }
            ]
        },
        {
            'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101',
            'output_url': 'rtmp://192.168.83.100:1935/live/102',
            'model_configs':  [
                {
                    'model_path': 'models/yolov5s/Net_0',
                    'plugin_path': 'plugins/libYolov5Plugin.so',
                    'model_name': 'yolov5s',
                },
                {
                    'model_path': 'models/yuntai/Net_0',
                    'plugin_path': 'plugins/libMultiPlugin.so',
                    'model_name': 'yuntai',
                }
            ]
        },
        #         {
        #     'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101',
        #     'output_url': 'rtsp://192.168.1.100:8554/live/test3',
        #     'model_configs':  [
        #         {
        #             'model_path': 'models/yolov5s/Net_0',
        #             'plugin_path': 'plugins/libYolov5Plugin.so',
        #             'model_name': 'yolov5s',
        #         },
        #         {
        #             'model_path': 'models/yuntai/Net_0',
        #             'plugin_path': 'plugins/libMultiPlugin.so',
        #             'model_name': 'yuntai',
        #         }
        #     ]
        # },
        #         {
        #     'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101',
        #     'output_url': 'rtsp://192.168.1.100:8554/live/test4',
        #     'model_configs':  [
        #         {
        #             'model_path': 'models/yolov5s/Net_0',
        #             'plugin_path': 'plugins/libYolov5Plugin.so',
        #             'model_name': 'yolov5s',
        #         },
        #         {
        #             'model_path': 'models/yuntai/Net_0',
        #             'plugin_path': 'plugins/libMultiPlugin.so',
        #             'model_name': 'yuntai',
        #         }
        #     ]
        # }
    ]

    device_count, ret = sdk.lyn_get_device_count()
    if ret != 0 or device_count <= 0:
        print("Error: Unable to fetch device count or no devices available.")
        exit(1)

    threads = []
    for idx, device in enumerate(devices):
        # 通过取余计算均匀分配的 device_id,设备 ID 从 0 开始
        device_id = idx % device_count
        run_device(device['input_url'],device['output_url'],device_id, device['model_configs'], threads)
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()