Newer
Older
safe-algo-pro / apis / display.py
import asyncio
import threading

import cv2
from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect
from starlette.responses import StreamingResponse

from common.display_frame_manager import DisplayFrameManager
from common.global_logger import logger

router = APIRouter()
display_frame_manager = DisplayFrameManager()


async def generate_video_stream(device_id, request: Request):
    while True:
        if await request.is_disconnected():
            print("客户端已断开连接,停止视频流")
            break

        if display_frame_manager.has_device(device_id):
            frame = display_frame_manager.get_latest_frame(device_id)
            # 将帧编码为 JPEG 格式
            _, buffer = cv2.imencode('.jpg', frame)
            frame = buffer.tobytes()

            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

        await asyncio.sleep(0.03)  # 控制帧的推送频率,减小CPU占用


@router.get("/video")
async def video_stream(device_id: int, request: Request):
    return StreamingResponse(generate_video_stream(device_id, request),
                             media_type="multipart/x-mixed-replace; boundary=frame")


async def send_video_stream(device_id, websocket: WebSocket):
    await websocket.accept()  # 接受 WebSocket 连接
    try:
        while True:
            # 检查并获取设备的最新帧
            frame = None
            if display_frame_manager.has_device(device_id):
                frame = display_frame_manager.get_latest_frame(device_id)

            # 如果有可用帧,则进行处理并发送
            if frame is not None:
                # 将帧编码为 JPEG
                _, buffer = cv2.imencode('.jpg', frame)
                # 将图像数据编码为 Base64 字符串(或者可以直接发送字节流)
                await websocket.send_bytes(buffer.tobytes())
                await asyncio.sleep(0)
            else:
                # 如果没有帧,可以选择发送空帧或跳过
                await asyncio.sleep(0.03)  # 控制帧率,避免过度占用资源
    except WebSocketDisconnect:
        logger.info("WebSocket 连接已断开")
    except Exception as e:
        logger.error(f"WebSocket 连接出错: {e}")


def run_in_thread(device_id, websocket: WebSocket):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(send_video_stream(device_id, websocket))
    loop.close()


@router.websocket("/ws/video/{device_id}")
async def video_stream(websocket: WebSocket, device_id: int):
    await send_video_stream(device_id,websocket)