import asyncio import threading import cv2 from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect from starlette.responses import StreamingResponse from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger router = APIRouter() display_frame_manager = DisplayFrameManager() async def generate_video_stream(device_id, request: Request): while True: if await request.is_disconnected(): print("客户端已断开连接,停止视频流") break if display_frame_manager.has_device(device_id): frame = display_frame_manager.get_latest_frame(device_id) # 将帧编码为 JPEG 格式 _, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') await asyncio.sleep(0.03) # 控制帧的推送频率,减小CPU占用 @router.get("/video") async def video_stream(device_id: int, request: Request): return StreamingResponse(generate_video_stream(device_id, request), media_type="multipart/x-mixed-replace; boundary=frame") async def send_video_stream(device_id, websocket: WebSocket): await websocket.accept() # 接受 WebSocket 连接 try: while True: # 检查并获取设备的最新帧 frame = None if display_frame_manager.has_device(device_id): frame = display_frame_manager.get_latest_frame(device_id) # 如果有可用帧,则进行处理并发送 if frame is not None: # 将帧编码为 JPEG _, buffer = cv2.imencode('.jpg', frame) # 将图像数据编码为 Base64 字符串(或者可以直接发送字节流) await websocket.send_bytes(buffer.tobytes()) await asyncio.sleep(0) else: # 如果没有帧,可以选择发送空帧或跳过 await asyncio.sleep(0.03) # 控制帧率,避免过度占用资源 except WebSocketDisconnect: logger.info("WebSocket 连接已断开") except Exception as e: logger.error(f"WebSocket 连接出错: {e}") def run_in_thread(device_id, websocket: WebSocket): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(send_video_stream(device_id, websocket)) loop.close() @router.websocket("/ws/video/{device_id}") async def video_stream(websocket: WebSocket, device_id: int): await send_video_stream(device_id,websocket)