import io import os from io import BytesIO from typing import List, Optional import cv2 from fastapi import APIRouter, Depends, Query, HTTPException, Request from sqlmodel import Session from fastapi.responses import StreamingResponse, FileResponse from apis.base import StandardResponse, PageResponse, standard_response, standard_error_response from common.biz_exception import BizException from db.database import get_db from entity.base import parse_datetime from entity.device_frame import DeviceFrame from services.device_frame_service import DeviceFrameService router = APIRouter() @router.get("/page/", response_model=StandardResponse[PageResponse[DeviceFrame]]) def get_frame_page( device_name: Optional[str] = None, device_code: Optional[str] = None, frame_start_time: Optional[str] = None, frame_end_time: Optional[str] = None, offset: int = Query(0, ge=0), # 从第几条开始 limit: int = Query(10, ge=1), # 每页显示多少条记录 db: Session = Depends(get_db)): service = DeviceFrameService(db) # 获取分页后的设备列表和总数 frames, total = service.get_frame_page(device_name, device_code, parse_datetime(frame_start_time), parse_datetime(frame_end_time), offset, limit) return standard_response( data=PageResponse(total=total, items=frames) ) # 路由:使用 OpenCV 生成内存图像并返回字节流 @router.get("/frame_image/") def get_frame_image(frame_id, db: Session = Depends(get_db)): try: service = DeviceFrameService(db) frame = service.get_frame_annotator(frame_id) if frame is None: return standard_error_response(message="Frame does not exist") _, img_encoded = cv2.imencode('.jpg', frame) img_bytes = img_encoded.tobytes() # 转换为字节格式 # 定义生成器分块读取内存中的图片数据 def iterfile(): chunk_size = 1024 * 64 # 64KB 每块 file_like = io.BytesIO(img_bytes) # 使用 BytesIO 将字节数据视为文件对象 while chunk := file_like.read(chunk_size): yield chunk return StreamingResponse(iterfile(), media_type="image/jpeg") except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading image: {str(e)}") @router.get("/frame_test") def get_frame_test(request: Request): file_path = "test.jpg" return FileResponse(file_path) # def iterfile(): # with open(file_path, 'rb') as file: # while chunk := file.read(1024): # yield chunk # # return StreamingResponse(iterfile(), media_type="image/jpeg")