from datetime import datetime from typing import Optional, Tuple, Sequence from sqlalchemy import func from sqlalchemy.ext.asyncio import AsyncSession from sqlmodel import select from entity.data_gas import DataGas, DataGasInfo from entity.device import Device class DataGasService: def __init__(self, db: AsyncSession): self.db = db async def add_data_gas(self, data_gas: DataGas): self.db.add(data_gas) await self.db.commit() await self.db.refresh(data_gas) return data_gas async def get_data_gas_page(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, offset: int = 0, limit: int = 10 ) -> Tuple[Sequence[DataGasInfo], int]: statement = self.gas_query(device_code, end_time, start_time) # 查询总记录数 total_statement = select(func.count()).select_from(statement.subquery()) total_result = await self.db.execute(total_statement) total = total_result.scalar_one() # 添加分页限制 statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 results = await self.db.execute(statement) rows = results.all() data_gas_info_list = [ DataGasInfo( id=data_gas.id, device_code=data_gas.device_code, gas_value=data_gas.gas_value, ts=data_gas.ts, device_name=device_name ) for data_gas, device_name in rows ] return data_gas_info_list, total # 返回分页数据和总数 async def get_data_gas_list(self, device_code: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, ) -> Sequence[DataGasInfo]: statement = self.gas_query(device_code, end_time, start_time) results = await self.db.execute(statement) rows = results.all() data_gas_info_list = [ DataGasInfo( id=data_gas.id, device_code=data_gas.device_code, gas_value=data_gas.gas_value, ts=data_gas.ts, device_name=device_name ) for data_gas, device_name in rows ] return data_gas_info_list def gas_query(self, device_code, end_time, start_time): statement = ( select(DataGas, Device.name) .join(Device, DataGas.device_code == Device.code) ) if device_code: statement = statement.where(DataGas.device_code.like(f"%{device_code}%")) if start_time: statement = statement.where(DataGas.ts >= start_time) if end_time: statement = statement.where(DataGas.ts <= end_time) statement = statement.order_by(DataGas.ts.desc()) return statement