Newer
Older
lynxi-casic-demo / app.py
zhangyingjie on 24 Jan 43 KB 增加后台接口调用
import base64
import importlib
from blockqueue import block_queue
from callback_data_struct import *
from constants import ALARM_URI, SERVER_BASE_URL
from detect_utils import bbox, Box, SubObject, SubClass, free_to_pool_callback, recv_frame_cb, set_padding_data
from http_tool import HttpTool
import common
import bufferpool
from global_logger import logger

import math
import queue
import sys, os
from types import SimpleNamespace
from typing import List, Tuple
import time
import numpy as np
import pylynchipsdk as sdk
import copy
import threading
import multiprocessing
import signal
import sys
import json
import ctypes
from ctypes import *
import struct

import subprocess
import argparse


from string_utils import camel_to_snake, get_class, get_fun

# import ffmpeg
try:
    import cv2
except ImportError:
    cv2 = None


def cancel_process(signum, frame):
    global cancel_flag
    cancel_flag.value = True


cancel_flag = multiprocessing.Value('b', False)
http_tool = HttpTool()


class ModelHandler:
    def __init__(self, model_path, plugin_path, model_name,model_code, handle_task, objects, image_width, image_height) -> None:
        self.model_path = model_path
        self.plugin_path = plugin_path
        self.model_name = model_name
        self.model_code = model_code
        self.handle_task = handle_task if handle_task else "base_model_handler"
        self.objects = objects
        logger.info(f'init model: model_path = {self.model_path}, plugin_path = {self.plugin_path}, model_name = {self.model_name}')

        self.model, ret = sdk.lyn_load_model(self.model_path)
        common.error_check(ret, "lyn_load_model")

        self.plugin, ret = sdk.lyn_plugin_register(self.plugin_path)
        common.error_check(ret, "lyn_plugin_register")

        self.apu_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.plugin_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.apu_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.model_desc, ret = sdk.lyn_model_get_desc(self.model)
        common.error_check(ret, "lyn_model_get_desc")
        self.batch_size = self.model_desc.inputTensorAttrArray[0].batchSize
        self.model_width = self.model_desc.inputTensorAttrArray[0].dims[2]
        self.model_height = self.model_desc.inputTensorAttrArray[0].dims[1]

        self.apu_output_size = self.model_desc.outputDataLen

        self.class_num = self.model_desc.outputTensorAttrArray[0].dims[3] - 5
        self.anchor_size = self.model_desc.outputTensorAttrArray[0].dims[1]
        self.boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))

        self.sub_objects_info, ret = sdk.lyn_malloc(ctypes.sizeof(SubObject))

        sub_obj = sdk.c_malloc(ctypes.sizeof(SubObject))

        # 获取底层指针
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        raw_ptr = ctypes.pythonapi.PyCapsule_GetPointer(sub_obj, None)
        if not raw_ptr:
            raise ValueError("Failed to extract pointer from PyCapsule")

        # 转换为 ctypes.POINTER(SubObject)
        sub_obj_ptr = ctypes.cast(raw_ptr, ctypes.POINTER(SubObject))

        # 通过 ctypes.POINTER 操作数据
        sub_obj_instance = sub_obj_ptr.contents
        sub_obj_instance.objectsnum = len(self.objects)

        for i in range(sub_obj_instance.objectsnum):
            obj = self.objects[i]
            sub_obj_instance.objects[i].object_id = int(obj.object_code)
            sub_obj_instance.objects[i].object_name = obj.object_name
            sub_obj_instance.objects[i].conf_threshold = obj.conf_threshold
            sub_obj_instance.objects[i].alarm_threshold = obj.alarm_threshold
            sub_obj_instance.objects[i].model_code = obj.model_code
            if obj.range:
                # 转为(x1 y1 x2 y2 x3 y3 x4 y4)
                # range_values = [float(r) for r in obj.range]
                abs_range = []
                for boundary_point in obj.range:                    
                    abs_range.append(boundary_point.x * image_width)
                    abs_range.append(boundary_point.y * image_height)
                logger.debug(f'abs_range = {abs_range}')
                sub_obj_instance.objects[i].range = (c_float * 8)(*abs_range)
            else:
                sub_obj_instance.objects[i].range = (0, 0, 0, 0, 0, 0, 0, 0)

        ret = sdk.lyn_memcpy(
            self.sub_objects_info, sub_obj, ctypes.sizeof(SubObject), sdk.lyn_memcpy_dir_t.ClientToServer
        )
        common.error_check(ret, "save_boxinfo_cb lyn_memcpy")

        self.apu_output_mem_pool = bufferpool.buffer_pool(
            self.apu_output_size * self.batch_size, 5
        )
        
        logger.debug(f'batch_size = {self.batch_size}')
        logger.debug(f'model_width = {self.model_width}')
        logger.debug(f'model_height = {self.model_height}')
        logger.debug(f'apu_output_size = {self.apu_output_size}')
        logger.debug(f'class_num = {self.class_num}')
        logger.debug(f'anchor_size = {self.anchor_size}')




def alarm_upload(data):
    global http_tool
    url = f'{SERVER_BASE_URL}{ALARM_URI}'
    http_tool.post(url = url, data = data, need_token=True) 


def show_video_cb(params):
    cb_data: cb_data = params[0]
    model_infos = params[1]
    last_alarm_time = params[2]

    device_no = cb_data.attr.device_no
    alarm_interval = cb_data.attr.alarm_interval

    if(alarm_interval <= 0):
        return 0

    alarm_infos = []

    for model_info in model_infos:
        boxes_info = model_info.boxes_info
        model_code = model_info.model_code

        dst_img_size = ctypes.sizeof(Box)
        host_buf_arr = np.ones(dst_img_size, dtype=np.uint8)
        host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr)
        ret = sdk.lyn_memcpy(
            host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient
        )
        common.error_check(ret, "save_boxinfo_cb lyn_memcpy")
        host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None)
        c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents
        # box_dict = {"boxesnum": c_box.boxesnum, "boxes": []}
        # print(c_box.boxesnum)

        bbox_dicts = []
        for i in range(c_box.boxesnum):
            bbox_instance = c_box.boxes[i]
            bbox_dict = {
                "xmin": bbox_instance.xmin,
                "ymin": bbox_instance.ymin,
                "xmax": bbox_instance.xmax,
                "ymax": bbox_instance.ymax,
                "score": bbox_instance.score,
                "id": bbox_instance.id,
                "label": bbox_instance.label,
                "alarm": bbox_instance.alarm,
                "model_code": bbox_instance.model_code,
            }
            
            # print(bbox_dict)
            bbox_dicts.append(bbox_dict)
            
        handle_task_name = model_info.handle_task
        handle_alarm_info = get_fun(f'model_handler.{handle_task_name}', 'handle_alarm_info')
        alarm_infos.extend(handle_alarm_info(bbox_dicts, last_alarm_time,device_no, alarm_interval))
    
    if alarm_infos:
        data = np.empty(cb_data.frame.size, dtype=np.uint8)
        data_ptr = sdk.lyn_numpy_to_ptr(data)
        ret = sdk.lyn_memcpy(
            data_ptr,
            cb_data.frame.data,
            cb_data.frame.size,
            sdk.lyn_memcpy_dir_t.ServerToClient,
        )
        common.error_check(ret, "lyn_memcpy")
        yuvImg = np.reshape(
            data, (cb_data.attr.height * 3 // 2, cb_data.attr.width)
        ).astype(np.uint8)
        import cv2
        rgbImg = cv2.cvtColor(yuvImg, cv2.COLOR_YUV2BGR_NV12)

        # 编码为 JPEG 格式
        _, buffer = cv2.imencode('.jpg', rgbImg)
        # 转换为 Base64 并添加头部
        image_base64 = f"data:image/jpeg;base64,{base64.b64encode(buffer).decode('utf-8')}"


        save_dir = "./saved_images"
        device_dir = os.path.join(save_dir, device_no)
        os.makedirs(device_dir, exist_ok=True)
        # 生成存储的文件名
        timestamp = time.strftime('%Y%m%d_%H%M%S', time.localtime())
        file_path = os.path.join(device_dir, f"{timestamp}.jpg")

        cv2.imwrite(file_path, rgbImg)

        for alarm_info in alarm_infos:
            logger.debug(alarm_info)
            alarm_info["picBase64"] = image_base64
            alarm_upload(alarm_info)
            
    return 0

def save_file_cb(params):
    cb_data: save_file_cb_data = params[0]
    frame_cb_data: cb_data = params[1]

    packet = cb_data.packet
    data_size, ret = sdk.lyn_enc_get_remote_packet_valid_size(packet)
    common.error_check(ret, "lyn_enc_get_remote_packet_valid_size")
    data = np.zeros(data_size, dtype=np.byte)
    data_ptr = sdk.lyn_numpy_to_ptr(data)
    ret = sdk.lyn_memcpy(
        data_ptr, packet.data, data_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "lyn_memcpy")
    
    cb_data.video_frame.put([data, packet.eos])
    cb_data.recv_pool.push(packet.data)
    return ret



def show_boxinfo_cb(params):
    model_name = params[0]
    boxes_info = params[1]
    dst_img_size = ctypes.sizeof(Box)
    host_buf_arr = np.ones(dst_img_size, dtype=np.uint8)
    host_buf = sdk.lyn_numpy_to_ptr(host_buf_arr)
    ret = sdk.lyn_memcpy(
        host_buf, boxes_info, dst_img_size, sdk.lyn_memcpy_dir_t.ServerToClient
    )
    common.error_check(ret, "save_boxinfo_cb lyn_memcpy")
    host_buf_c = pythonapi.PyCapsule_GetPointer(host_buf, None)
    c_box = ctypes.cast(host_buf_c, ctypes.POINTER(Box)).contents
    dump_box_json(model_name, c_box)
    return 0


def dump_box_json(model_name, c_box):
    box_dict = {"boxesnum": c_box.boxesnum, "boxes": []}
    for i in range(c_box.boxesnum):
        bbox_instance = c_box.boxes[i]
        bbox_dict = {
            "xmin": bbox_instance.xmin,
            "ymin": bbox_instance.ymin,
            "xmax": bbox_instance.xmax,
            "ymax": bbox_instance.ymax,
            "score": bbox_instance.score,
            "id": bbox_instance.id,
            "label": bbox_instance.label,
            "alarm": bbox_instance.alarm,
            "model_code": bbox_instance.model_code,
        }
        box_dict["boxes"].append(bbox_dict)
    logger.debug(box_dict)

class yolov5:
    def __init__(self, attr: device_process_attr) -> None:
        self.attr = ""
        self.__ctx = ""
        self.__send_thread = ""
        self.__recv_thread = ""
        self.__ipe_thread = ""
        self.__demux_hdl = ""
        self.codec_para = ""
        self.__vdec_hdl = ""
        self.__vdec_attr = sdk.lyn_vdec_attr_t()
        self.__send_queue = block_queue()
        self.__ipe_queue = block_queue()
        self.__send_num = 0
        self.__recv_num = 0
        # 创建上下文环境
        self.attr = copy.copy(attr)
        self.video_frame = attr.video_frame
        self.__ctx, ret = sdk.lyn_create_context(self.attr.device_id)
        self.enc_head_flag = True
        common.error_check(ret, "create context")
        ret = sdk.lyn_register_error_handler(common.default_stream_error_handle)
        common.error_check(ret, 'lyn_register_error_handler')

        # 打开解封装器
        self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)
        common.error_check(ret, "lyn_demux_open ")
        self.codec_para, ret = sdk.lyn_demux_get_codec_para(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_codec_para ")
        self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
        common.error_check(ret, "lyn_demux_get_framerate")
        logger.debug(f'fps = {self.fps}')

        # 打开解码器
        self.__vdec_attr.codec_id = self.codec_para.codec_id
        self.__vdec_attr.output_fmt = self.attr.output_fmt
        self.__vdec_attr.scale = self.attr.scale
        self.__vdec_hdl, ret = sdk.lyn_vdec_open(self.__vdec_attr)
        common.error_check(ret, "lyn_vdec_open ")
        self.vdec_out_info, ret = sdk.lyn_vdec_get_out_info(
            self.codec_para, self.__vdec_attr
        )
        logger.debug(f'self.vdec_out_info.width = {self.vdec_out_info.width}')
        logger.debug(f'self.vdec_out_info.height = {self.vdec_out_info.height}')
        common.error_check(ret, "lyn_vdec_get_out_info ")
        self.attr.width = self.vdec_out_info.width
        self.attr.height = self.vdec_out_info.height

        # 创建stream、event和ipe desc
        self.send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.ipe_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        # self.apu_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        # self.plugin_stream, ret = sdk.lyn_create_stream()
        # common.error_check(ret, "lyn_create_stream")
        self.venc_recv_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")
        self.venc_send_stream, ret = sdk.lyn_create_stream()
        common.error_check(ret, "lyn_create_stream")

        self.ipe_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")
        # self.apu_event, ret = sdk.lyn_create_event()
        # common.error_check(ret, "lyn_create_event")
        self.plugin_event, ret = sdk.lyn_create_event()
        common.error_check(ret, "lyn_create_event")

        self.ipe_input_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_input_desc")
        self.ipe_output_desc, ret = sdk.lyn_ipe_create_pic_desc()
        common.error_check(ret, "lyn_ipe_create_pic_desc ipe_output_desc")
        self.ipe_config, ret = sdk.lyn_ipe_create_config_desc()
        common.error_check(ret, "lyn_ipe_create_config_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_reset_pic_desc")
        ret = sdk.lyn_ipe_reset_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_reset_config_desc")

        # 获取模型信息
        self.model_infos = []
        self.model_nums = 0
        for info in self.attr.model_configs:
            model_name = f'./models/{os.path.splitext(os.path.basename(info.model_path))[0]}'
            self.model_infos.append(ModelHandler(f'{model_name}/Net_0', 
            f'{model_name}/plugin.so', 
            info.model_name,
            info.model_code,
            info.handle_task,
            info.objects,
            self.attr.width,
            self.attr.height))
            self.model_nums += 1

        
        self.batch_size = self.model_infos[0].batch_size
        self.model_width = self.model_infos[0].model_width
        self.model_height = self.model_infos[0].model_height
        self.apu_output_size = self.model_infos[0].apu_output_size

        logger.debug(f'self.batch_size = {self.batch_size}')
        logger.debug(f'self.model_width = {self.model_width}')
        logger.debug(f'self.model_height = {self.model_height}')
        logger.debug(f'self.apu_output_size = {self.apu_output_size}')

        self.ipe_output_size = (
                self.model_width
                * self.model_height
                * self.model_infos[0].model_desc.inputTensorAttrArray[0].dims[3]
        )
        
        (
            self.resize_width,
            self.resize_height,
            self.pad_x,
            self.pad_y,
        ) = set_padding_data(
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.model_width,
            self.model_height,
        )

        
        # 创建对象池
        self.ipe_output_mem_pool = bufferpool.buffer_pool(
            self.ipe_output_size * self.batch_size, 5
        )

        self.venc_recv_pool = bufferpool.buffer_pool(
            self.vdec_out_info.predict_buf_size, 5
        )
        logger.debug(f'self.ipe_output_size = {self.ipe_output_size}')
        logger.debug(f'self.vdec_out_info.predict_buf_size = {self.vdec_out_info.predict_buf_size}')

        # 设置编码参数
        self.venc_attr = sdk.lyn_venc_attr_t()
        ret = sdk.lyn_venc_set_default_params(self.venc_attr)
        common.error_check(ret, "lyn_venc_set_default_params")
        self.venc_attr.codec_type = sdk.lyn_codec_id_t.LYN_CODEC_ID_H264
        self.venc_attr.width = self.vdec_out_info.width
        self.venc_attr.height = self.vdec_out_info.height
        self.venc_attr.bit_depth = 8
        self.venc_attr.bframes_num = 0
        self.venc_attr.pframes_num = 5
        self.venc_attr.input_format = sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12
        self.venc_attr.target_bitrate = 6000000
        self.venc_attr.level = -1
        self.venc_handle, ret = sdk.lyn_venc_open(self.venc_attr)
        common.error_check(ret, "lyn_vdec_open")

        self.total_boxes_info, ret = sdk.lyn_malloc(ctypes.sizeof(Box))
        self.start_process_time = None
        self.last_push_time = None
        self.stream_out_process = None
        self.init_output_process()

        
        self.frame_step = 5
        self.frame_idx = self.frame_step - 1 # 为了处理第0帧,这里不从0开始计数,而是从step-1开始

        self.last_alarm_time = {} #key: model_code-objcet_code, value: time

    
    def init_output_process(self):
        command = ['ffmpeg',
                '-i', '-',
                "-c:v", "copy",
                '-f', 'rtsp',
                '-rtsp_transport', 'tcp',
                self.attr.output_path]

        # 启动FFmpeg子进程
        try:
            self.stream_out_process = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        except OSError as e:
            logger.exception(f"Failed to start process: {e}")
        except subprocess.SubprocessError as e:
            logger.exception(f"Subprocess error: {e}")
        except Exception as e:
            logger.exception(f"An unexpected error occurred: {e}")

        # stream_out_process = (
        # ffmpeg
        # .input('pipe:', format='rawvideo', pix_fmt='bgr24', s='640x360', r=25)
        # .output(output_path, vcodec='libx264',
        #         preset='ultrafast', tune='zerolatency',
        #         bitrate='1000k',maxrate='1000k', bufsize='2000k', f='rtsp',g=10)
        # .run_async(pipe_stdin=True, pipe_stderr=True,overwrite_output=True, cmd=['ffmpeg', '-report'])
        # )

        def read_stderr(process):
            for line in iter(process.stderr.readline, b''):
                logger.info(f"stderr: {line.decode('utf-8').strip()}")

        if self.stream_out_process:
            stderr_thread = threading.Thread(target=read_stderr, args=(self.stream_out_process,))
            stderr_thread.daemon = True
            stderr_thread.start()


    def push_video(self):
        # show window
        global cancel_flag
        

        frame_rate = self.fps  # 目标帧率
        frame_interval = 1 / frame_rate  # 每帧间隔时间

        if not self.last_push_time:
            self.last_push_time = time.time()

        while True:
            try:
                frame = self.video_frame.get(False)
            except:
                if cancel_flag.value:
                    sys.exit()
                else:
                    time.sleep(0.01)
                    continue
            if frame[1] or cancel_flag.value:
                # cv2.destroyAllWindows()
                sys.exit()

            if frame and self.stream_out_process:
                image = frame[0]
                # resized_image = cv2.resize(image, (640, 360))

                self.stream_out_process.stdin.write(image.tobytes())
                # elapsed_time = time.time() - self.start_process_time
                elapsed_time = time.time() - self.last_push_time
                sleep_time = frame_interval - elapsed_time - 0.01
                # print(f'elapsed_time = {elapsed_time*1000:.3f}ms, sleep_time = {sleep_time*1000:.3f}ms')
                time.sleep(max(0, sleep_time))
                self.last_push_time = time.time()


    def send_thread_func(self, cancel_flag):
        # 设置上下文环境 创建发送stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            # 从解封装器读取一个包
            pkt, ret = sdk.lyn_demux_read_packet(self.__demux_hdl)
            eos = pkt.eos
            if ret != 0 and not eos:
                sdk.lyn_demux_close(self.__demux_hdl)
                time.sleep(500.0 / 1000)
                logger.warning("demux failed, reconnecting...")
                self.__demux_hdl, ret = sdk.lyn_demux_open(self.attr.url)

                if not ret:
                    self.fps, ret = sdk.lyn_demux_get_framerate(self.__demux_hdl)
                    common.error_check(ret, "lyn_synchronize_stream")
                    logger.debug(f'fps = {self.fps}')
                continue

            # 发送给解码器解码
            ret = sdk.lyn_vdec_send_packet_async(self.send_stream, self.__vdec_hdl, pkt)
            common.error_check(ret, "lyn_vdec_send_packet_async")
            ret = sdk.lyn_synchronize_stream(self.send_stream)
            common.error_check(ret, "lyn_synchronize_stream")

            # 释放packet内存并通知接收结果
            if not eos:
                sdk.lyn_demux_free_packet(pkt)
                self.__send_num += 1
                self.__send_queue.put(self.__send_num)
            else:
                self.__send_queue.put(-1)
            if cancel_flag.value:
                return

    def recv_thread_func(self, cancel_flag):
        # 设置上下文环境 创建接收stream
        sdk.lyn_set_current_context(self.__ctx)
        frame_pool = bufferpool.buffer_pool(self.vdec_out_info.predict_buf_size, 5)
        while self.__recv_num >= 0:
            self.__recv_num = self.__send_queue.take()

            cb_data = recv_cb_data()
            cb_data.frame.eos = self.__recv_num < 0
            cb_data.frame.data = frame_pool.pop()
            cb_data.frame.size = self.vdec_out_info.predict_buf_size
            cb_data.frame_pool = frame_pool
            cb_data.send_num = self.__send_num
            cb_data.recv_num = self.__recv_num
            cb_data.attr = self.attr
            cb_data.block_queue = self.__ipe_queue
            cb_data.video_frame = self.video_frame
            # 插入接收指令,并添加接收完成回调函数
            ret = sdk.lyn_vdec_recv_frame_async(
                self.recv_stream, self.__vdec_hdl, cb_data.frame
            )
            common.error_check(ret, "lyn_vdec_recv_frame_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.recv_stream, recv_frame_cb, cb_data
            )
            common.error_check(ret, "lyn_stream_add_async_callback")
            if cancel_flag.value:
                return

    def ipe_thread_func(self, cancel_flag):
        # 设置上下文环境 创建ipe stream
        sdk.lyn_set_current_context(self.__ctx)
        eos = False
        while not eos:
            cb_data = self.__ipe_queue.take()
            eos = cb_data.frame.eos
            self.start_process_time = time.time()
            self.model_process(cb_data) # 这里的cb_data是从视频流解析出来的视频帧
            if cancel_flag.value:
                return

    def ipe_process(self, cb_data: framepool_cb_data):
        sdk.lyn_set_current_context(self.__ctx)
        ipe_out_data = self.ipe_output_mem_pool.pop()
        # 设置ipe输入
        ret = sdk.lyn_ipe_set_input_pic_desc(
            self.ipe_input_desc,
            cb_data.frame.data,
            self.vdec_out_info.width,
            self.vdec_out_info.height,
            self.__vdec_attr.output_fmt,
        )
        common.error_check(ret, "lyn_ipe_set_input_pic_desc")
        ret = sdk.lyn_ipe_set_output_pic_data(self.ipe_output_desc, ipe_out_data)
        common.error_check(ret, "lyn_ipe_set_output_pic_data")
        ret = sdk.lyn_ipe_set_resize_config(
            self.ipe_config, self.resize_width, self.resize_height
        )
        common.error_check(ret, "lyn_ipe_set_resize_config")
        ret = sdk.lyn_ipe_set_pad_config(
            self.ipe_config,
            self.pad_y,
            self.pad_x,
            self.pad_y,
            self.pad_x,
            114,
            114,
            114,
        )
        common.error_check(ret, "lyn_ipe_set_pad_config")
        ret = sdk.lyn_ipe_set_c2c_config(
            self.ipe_config, sdk.lyn_pixel_format_t.LYN_PIX_FMT_RGB24, 0
        )
        common.error_check(ret, "lyn_ipe_set_c2c_config")
        ret = sdk.lyn_ipe_cal_output_pic_desc(
            self.ipe_output_desc, self.ipe_input_desc, self.ipe_config, 0
        )
        common.error_check(ret, "lyn_ipe_cal_output_pic_desc")
        ret = sdk.lyn_ipe_process_async(
            self.ipe_stream, self.ipe_input_desc, self.ipe_output_desc, self.ipe_config
        )
        common.error_check(ret, "lyn_ipe_process_async")

        # ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        # common.error_check(ret, "lyn_record_event")

        return ipe_out_data

    def get_channel_info(self) -> str:
        return f'{self.attr.device_id}_{self.attr.output_path}'

    
    def model_process(self, cb_data):
        self.frame_idx = (self.frame_idx + 1) % self.frame_step
        if self.frame_idx == 0:
            # print(f'{self.frame_idx} process')
            # 正常处理
            ipe_out_data = self.ipe_process(cb_data)
            for model_info in self.model_infos:
                apu_output_data = self.apu_process(model_info, ipe_out_data)
                self.plugin_process(model_info, apu_output_data, cb_data)

            ret = sdk.lyn_stream_add_async_callback(
                self.model_infos[-1].plugin_stream,
                show_video_cb,
                [cb_data, self.model_infos,self.last_alarm_time],
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

            # self.merge_plugin_process(cb_data)
            self.encode_process(cb_data)

            common.print_frame_rate(self.get_channel_info())

            ret = sdk.lyn_stream_add_async_callback(
                self.model_infos[-1].apu_stream,
                free_to_pool_callback,
                [self.ipe_output_mem_pool, ipe_out_data],
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

        else:
            # print(f'{self.frame_idx} draw')
            # 跳过ipe apu处理,直接用上次的box渲染 
            for model_info in self.model_infos:
                self.plugin_draw_process(model_info, cb_data)
            self.encode_process(cb_data)
            common.print_frame_rate(self.get_channel_info())


    def apu_process(self, model_info, ipe_out_data):
        # 等待IPE处理完成
        ret = sdk.lyn_record_event(self.ipe_stream, self.ipe_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.apu_stream, self.ipe_event)
        common.error_check(ret, "lyn_stream_wait_event")
        apu_output_data = model_info.apu_output_mem_pool.pop()
        ret = sdk.lyn_execute_model_async(
            model_info.apu_stream,
            model_info.model,
            ipe_out_data,
            apu_output_data,
            self.batch_size,
        )
        common.error_check(ret, "lyn_execute_model_async")
        return apu_output_data

    def plugin_process(self,model_info, apu_output_data, cb_data):
        # 等待apu处理完成
        ret = sdk.lyn_record_event(model_info.apu_stream, model_info.apu_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(model_info.plugin_stream, model_info.apu_event)
        common.error_check(ret, "lyn_stream_wait_event")
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        apu_data_ptr = pythonapi.PyCapsule_GetPointer(apu_output_data, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)
        sub_objects_info_ptr =  pythonapi.PyCapsule_GetPointer(model_info.sub_objects_info, None)

        post_para = struct.pack(
            '6IH2f?3P',
            self.codec_para.width,
            self.codec_para.height,
            self.model_width,
            self.model_height,
            model_info.class_num,
            500,  # nmsTopK
            model_info.anchor_size,
            0.25,  # score threashold
            0.45,  # nms threshold
            True,  # is pad resize
            apu_data_ptr,
            boxes_info_ptr,
            sub_objects_info_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream, model_info.plugin, "lynPostProcess", post_para, len(post_para)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #         model_info.plugin_stream,
        #         show_boxinfo_cb,
        #         [model_info.model_name, model_info.boxes_info],
        # )

        # ret = sdk.lyn_stream_add_async_callback(
        #     model_info.plugin_stream, show_video_cb, [cb_data,model_info.boxes_info,model_info.model_code]
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_stream_add_async_callback(
            model_info.plugin_stream,
            free_to_pool_callback,
            [model_info.apu_output_mem_pool, apu_output_data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")
    
    def plugin_draw_process(self,model_info, cb_data):
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]
        
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None)

        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            model_info.plugin_stream,
            model_info.plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")



    def merge_plugin_process(self, cb_data):
        plugin = self.model_infos[-1].plugin
        plugin_stream = self.model_infos[-1].plugin_stream

        pythonapi.PyCapsule_GetPointer.restype = c_void_p
        pythonapi.PyCapsule_GetPointer.argtypes = [py_object, c_char_p]

        target_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        sources_ptrs = [
            pythonapi.PyCapsule_GetPointer(model_info.boxes_info, None) for model_info in self.model_infos
        ]
        while len(sources_ptrs) < 4:
            sources_ptrs.append(0)
        
        box_merge_params = struct.pack(
            "P4PH",                # 格式:目标指针 + 4 个源指针 + 源数量
            target_ptr,             # target 的地址
            *sources_ptrs,          # sources 的地址列表
            len(self.model_infos)   # 源数量
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream, plugin, "mergeBoxes", box_merge_params, len(box_merge_params)
        )
        common.error_check(ret, "lyn_plugin_run_async")

        # ret = sdk.lyn_stream_add_callback(
        #     plugin_stream,
        #     show_boxinfo_cb,
        #     ['all', self.total_boxes_info],
        # )
        boxes_info_ptr = pythonapi.PyCapsule_GetPointer(self.total_boxes_info, None)
        frame_data_ptr = pythonapi.PyCapsule_GetPointer(cb_data.frame.data, None)
        format = int(sdk.lyn_pixel_format_t.LYN_PIX_FMT_NV12)
        draw_para = struct.pack(
            'P2IiP',
            boxes_info_ptr,
            self.codec_para.width,
            self.codec_para.height,
            format,
            frame_data_ptr,
        )
        ret = sdk.lyn_plugin_run_async(
            plugin_stream,
            plugin,
            "lynDrawBoxAndText",
            draw_para,
            len(draw_para),
        )
        common.error_check(ret, "lyn_plugin_run_async")

        ret = sdk.lyn_stream_add_async_callback(
            plugin_stream, show_video_cb, [cb_data,self.total_boxes_info]
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

    def encode_process(self, cb_data: cb_data):
        # 在 plugin 处理完之后,进行编码操作
        ret = sdk.lyn_record_event(self.model_infos[-1].plugin_stream, self.plugin_event)
        common.error_check(ret, "lyn_record_event")
        ret = sdk.lyn_stream_wait_event(self.venc_send_stream, self.plugin_event)
        common.error_check(ret, "lyn_stream_wait_event")
        if self.enc_head_flag:  # 第一帧编码
            self.enc_head_flag = False
            enc_packet = sdk.lyn_packet_t()
            enc_packet.size = self.vdec_out_info.predict_buf_size
            enc_packet.data = self.venc_recv_pool.pop()
            encode_data = save_file_cb_data()
            encode_data.output_path = self.attr.output_path
            encode_data.packet = enc_packet
            encode_data.recv_pool = self.venc_recv_pool
            encode_data.file_path = self.attr.output_path
            encode_data.video_frame = self.attr.video_frame
            
            ret = sdk.lyn_venc_get_paramsset_async(
                self.venc_recv_stream, self.venc_handle, enc_packet
            )
            common.error_check(ret, "lyn_venc_get_paramsset_async")
            ret = sdk.lyn_stream_add_async_callback(
                self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
            )
            common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_venc_sendframe_async(
            self.venc_send_stream, self.venc_handle, cb_data.frame
        )
        common.error_check(ret, "lyn_venc_sendframe_async")

        enc_packet = sdk.lyn_packet_t()
        enc_packet.size = self.vdec_out_info.predict_buf_size
        enc_packet.eos = cb_data.frame.eos
        enc_packet.data = self.venc_recv_pool.pop()
        encode_data = save_file_cb_data()
        encode_data.packet = enc_packet
        encode_data.recv_pool = self.venc_recv_pool
        encode_data.file_path = self.attr.output_path
        encode_data.output_path = self.attr.output_path
        encode_data.video_frame = self.attr.video_frame
        # ret = sdk.lyn_stream_add_async_callback(
        #     self.venc_send_stream,
        #     free_to_pool_callback,
        #     [cb_data.frame_pool, cb_data.frame.data],
        # )
        # common.error_check(ret, "lyn_stream_add_async_callback")
        ret = sdk.lyn_venc_recvpacket_async(
            self.venc_recv_stream, self.venc_handle, enc_packet
        )
        common.error_check(ret, "lyn_venc_recvpacket_async")
        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream, save_file_cb, [encode_data, cb_data]
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

        ret = sdk.lyn_stream_add_async_callback(
            self.venc_recv_stream,
            free_to_pool_callback,
            [cb_data.frame_pool, cb_data.frame.data],
        )
        common.error_check(ret, "lyn_stream_add_async_callback")

    def run(self):
        global cancel_flag
        # 开启发送线程
        self.__send_thread = threading.Thread(
            target=self.send_thread_func, args=(cancel_flag,)
        )
        self.__send_thread.start()

        # 开启接收线程
        self.__recv_thread = threading.Thread(
            target=self.recv_thread_func, args=(cancel_flag,)
        )
        self.__recv_thread.start()

        # 开启图像处理和推理线程
        self.__ipe_thread = threading.Thread(
            target=self.ipe_thread_func, args=(cancel_flag,)
        )
        self.__ipe_thread.start()

    def close(self):
        if self.__send_thread.is_alive():
            self.__send_thread.join()
        if self.__recv_thread.is_alive():
            self.__recv_thread.join()
        if self.__ipe_thread.is_alive():
            self.__ipe_thread.join()

        ret = sdk.lyn_synchronize_stream(self.send_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.ipe_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.apu_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.plugin_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_synchronize_stream")
        ret = sdk.lyn_synchronize_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_synchronize_stream")

        ret = sdk.lyn_destroy_stream(self.send_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.ipe_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.apu_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.plugin_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_recv_stream)
        common.error_check(ret, "lyn_destroy_stream")
        ret = sdk.lyn_destroy_stream(self.venc_send_stream)
        common.error_check(ret, "lyn_destroy_stream")

        ret = sdk.lyn_destroy_event(self.ipe_event)
        common.error_check(ret, "lyn_destroy_event")
        ret = sdk.lyn_destroy_event(self.apu_event)
        common.error_check(ret, "lyn_destroy_event")

        # destory ipe desc and config
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_input_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_pic_desc(self.ipe_output_desc)
        common.error_check(ret, "lyn_ipe_destroy_pic_desc")
        ret = sdk.lyn_ipe_destroy_config_desc(self.ipe_config)
        common.error_check(ret, "lyn_ipe_destroy_config_desc")
        ret = sdk.lyn_plugin_unregister(self.plugin)
        common.error_check(ret, "lyn_plugin_unregister")

        # 卸载模型
        
        for model_info in self.model_infos:
            ret = sdk.lyn_unload_model(model_info.model)
            common.error_check(ret, "lyn_unload_model")

        if self.__vdec_hdl != "":
            ret = sdk.lyn_vdec_close(self.__vdec_hdl)
            common.error_check(ret, "lyn_vdec_close")

        if self.__ctx != "":
            ret = sdk.lyn_destroy_context(self.__ctx)
            common.error_check(ret, "lyn_destroy_context")


def run_device(device_no:str, input_url: str, output_url:str, device_id: int, model_configs, alarm_interval, threads) -> None: 
    attr = device_process_attr()
    attr.device_no = device_no
    attr.url = input_url
    attr.output_path = output_url
    attr.device_id = device_id
    attr.alarm_interval = alarm_interval
    attr.video_frame = queue.Queue(10)
    attr.model_configs = model_configs
    if not attr.url:
        raise ValueError('input file miss!!!')
    if not attr.output_path:
        raise ValueError('unspecified output path!!!')

    try:
        decoder = yolov5(attr)

        infer_thread = threading.Thread(target=decoder.run, args=())
        threads.append(infer_thread)
        infer_thread.start()

        out_thread = threading.Thread(target=decoder.push_video, args=())
        threads.append(out_thread)
        out_thread.start()
    except Exception as e:
        logger.exception(f'model thread start failed: {e}')

    # decoder.close()


if __name__ == "__main__":
    signal.signal(signal.SIGINT, cancel_process)

    devices = [
        {
            'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101',
            'output_url': 'rtmp://192.168.83.100:1935/live/101',
            'model_configs':  [
                {
                    'model_path': 'models/yolov5s/Net_0',
                    'plugin_path': 'plugins/libYolov5Plugin.so',
                    'model_name': 'yolov5s',
                },
                {
                    'model_path': 'models/yuntai/Net_0',
                    'plugin_path': 'plugins/libMultiPlugin.so',
                    'model_name': 'yuntai',
                }
            ]
        },
        {
            'input_url': 'rtsp://admin:Casic203@192.168.83.64:554/Streaming/Channels/101',
            'output_url': 'rtmp://192.168.83.100:1935/live/102',
            'model_configs':  [
                {
                    'model_path': 'models/yolov5s/Net_0',
                    'plugin_path': 'plugins/libYolov5Plugin.so',
                    'model_name': 'yolov5s',
                },
                {
                    'model_path': 'models/yuntai/Net_0',
                    'plugin_path': 'plugins/libMultiPlugin.so',
                    'model_name': 'yuntai',
                }
            ]
        },
        #         {
        #     'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101',
        #     'output_url': 'rtsp://192.168.1.100:8554/live/test3',
        #     'model_configs':  [
        #         {
        #             'model_path': 'models/yolov5s/Net_0',
        #             'plugin_path': 'plugins/libYolov5Plugin.so',
        #             'model_name': 'yolov5s',
        #         },
        #         {
        #             'model_path': 'models/yuntai/Net_0',
        #             'plugin_path': 'plugins/libMultiPlugin.so',
        #             'model_name': 'yuntai',
        #         }
        #     ]
        # },
        #         {
        #     'input_url': 'rtsp://admin:Casic203@192.168.1.64:554/Streaming/Channels/101',
        #     'output_url': 'rtsp://192.168.1.100:8554/live/test4',
        #     'model_configs':  [
        #         {
        #             'model_path': 'models/yolov5s/Net_0',
        #             'plugin_path': 'plugins/libYolov5Plugin.so',
        #             'model_name': 'yolov5s',
        #         },
        #         {
        #             'model_path': 'models/yuntai/Net_0',
        #             'plugin_path': 'plugins/libMultiPlugin.so',
        #             'model_name': 'yuntai',
        #         }
        #     ]
        # }
    ]

    device_count, ret = sdk.lyn_get_device_count()
    if ret != 0 or device_count <= 0:
        print("Error: Unable to fetch device count or no devices available.")
        exit(1)

    threads = []
    for idx, device in enumerate(devices):
        # 通过取余计算均匀分配的 device_id,设备 ID 从 0 开始
        device_id = idx % device_count
        run_device(device['input_url'],device['output_url'],device_id, device['model_configs'], threads)
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()