Newer
Older
safe-algo-pro / services / data_gas_service.py
from datetime import datetime
from typing import Optional, Tuple, Sequence

from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select

from entity.data_gas import DataGas, DataGasInfo
from entity.device import Device


class DataGasService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def add_data_gas(self, data_gas: DataGas):
        self.db.add(data_gas)
        await self.db.commit()
        await self.db.refresh(data_gas)
        return data_gas

    async def get_data_gas_page(self,
                          device_code: Optional[str] = None,
                          start_time: Optional[datetime] = None,
                          end_time: Optional[datetime] = None,
                          offset: int = 0,
                          limit: int = 10
                          ) -> Tuple[Sequence[DataGasInfo], int]:
        statement = self.gas_query(device_code, end_time, start_time)

        # 查询总记录数
        total_statement = select(func.count()).select_from(statement.subquery())
        total_result = await self.db.execute(total_statement)
        total = total_result.scalar_one()

        # 添加分页限制
        statement = statement.offset(offset).limit(limit)

        # 执行查询并返回结果
        results = await self.db.execute(statement)
        rows = results.all()


        data_gas_info_list = [
            DataGasInfo(
                id=data_gas.id,
                device_code=data_gas.device_code,
                gas_value=data_gas.gas_value,
                ts=data_gas.ts,
                device_name=device_name
            )
            for data_gas, device_name in rows
        ]

        return data_gas_info_list, total  # 返回分页数据和总数

    async def get_data_gas_list(self,
                          device_code: Optional[str] = None,
                          start_time: Optional[datetime] = None,
                          end_time: Optional[datetime] = None,
                          ) -> Sequence[DataGasInfo]:
        statement = self.gas_query(device_code, end_time, start_time)
        results = await self.db.execute(statement)
        rows = results.all()
        data_gas_info_list = [
            DataGasInfo(
                id=data_gas.id,
                device_code=data_gas.device_code,
                gas_value=data_gas.gas_value,
                ts=data_gas.ts,
                device_name=device_name
            )
            for data_gas, device_name in rows
        ]
        return data_gas_info_list

    def gas_query(self, device_code, end_time, start_time):
        statement = (
            select(DataGas, Device.name)
            .join(Device, DataGas.device_code == Device.code)
        )
        if device_code:
            statement = statement.where(DataGas.device_code.like(f"%{device_code}%"))
        if start_time:
            statement = statement.where(DataGas.ts >= start_time)
        if end_time:
            statement = statement.where(DataGas.ts <= end_time)
        statement = statement.order_by(DataGas.ts.desc())
        return statement