Newer
Older
safe-algo-pro / services / data_gas_service.py
zhangyingjie on 2 Nov 2 KB 增加甲烷查询任务及接口
from datetime import datetime
from typing import Optional, Tuple, Sequence

from sqlalchemy import func
from sqlmodel import Session, select

from entity.data_gas import DataGas


class DataGasService:
    def __init__(self, db: Session):
        self.db = db

    def add_data_gas(self, data_gas: DataGas):
        self.db.add(data_gas)
        self.db.commit()
        self.db.refresh(data_gas)
        return data_gas

    def get_data_gas_page(self,
                          device_code: Optional[str] = None,
                          start_time: Optional[datetime] = None,
                          end_time: Optional[datetime] = None,
                          offset: int = 0,
                          limit: int = 10
                          ) -> Tuple[Sequence[DataGas], int]:
        statement = self.gas_query(device_code, end_time, start_time)

        # 查询总记录数
        total_statement = select(func.count()).select_from(statement.subquery())
        total = self.db.exec(total_statement).one()

        # 添加分页限制
        statement = statement.offset(offset).limit(limit)

        # 执行查询并返回结果
        results = self.db.exec(statement)
        return results.all(), total  # 返回分页数据和总数

    def get_data_gas_list(self,
                          device_code: Optional[str] = None,
                          start_time: Optional[datetime] = None,
                          end_time: Optional[datetime] = None,
                          ) -> Sequence[DataGas]:
        statement = self.gas_query(device_code, end_time, start_time)
        results = self.db.exec(statement)
        return results.all()

    def gas_query(self, device_code, end_time, start_time):
        statement = select(DataGas)
        if device_code:
            statement = statement.where(DataGas.device_code.like(f"%{device_code}%"))
        if start_time:
            statement = statement.where(DataGas.ts >= start_time)
        if end_time:
            statement = statement.where(DataGas.ts <= end_time)
        statement = statement.order_by(DataGas.ts.desc())
        return statement