Newer
Older
safe-algo-pro / services / device_service.py
zhangyingjie on 14 Oct 3 KB 完善模型检测流程
from datetime import datetime
from typing import Sequence, Optional, Tuple

from sqlalchemy import func
from sqlmodel import Session, select

from common.global_thread_pool import GlobalThreadPool
from common.consts import NotifyChangeType
from entity.device import Device, DeviceCreate, DeviceUpdate
from services.device_model_relation_service import DeviceModelRelationService


class DeviceService:
    def __init__(self, db: Session):
        self.db = db
        self.__device_change_callbacks = []  # 用于存储回调函数
        self.thread_pool = GlobalThreadPool()

    def register_change_callback(self, callback):
        """注册设备变化回调函数"""
        self.__device_change_callbacks.append(callback)

    def notify_change(self, device_id, change_type):
        """当设备发生变化时,调用回调通知变化"""
        for callback in self.__device_change_callbacks:
            self.thread_pool.executor.submit(callback, device_id, change_type)

    def get_device_list(self,
                        name: Optional[str] = None,
                        code: Optional[str] = None,
                        device_type: Optional[str] = None,
                        ) -> Sequence[Device]:
        statement = self.device_query(code, device_type, name)
        results = self.db.exec(statement)
        return results.all()

    def get_device_page(self,
                        name: Optional[str] = None,
                        code: Optional[str] = None,
                        device_type: Optional[str] = None,
                        offset: int = 0,
                        limit: int = 10
                        ) -> Tuple[Sequence[Device], int]:
        statement = self.device_query(code, device_type, name)

        # 查询总记录数
        total_statement = select(func.count()).select_from(statement.subquery())
        total = self.db.exec(total_statement).one()

        # 添加分页限制
        statement = statement.offset(offset).limit(limit)

        # 执行查询并返回结果
        results = self.db.exec(statement)
        return results.all(), total  # 返回分页数据和总数

    def device_query(self, code, device_type, name):
        # 构建查询语句
        statement = select(Device)
        if name:
            statement = statement.where(Device.name.like(f"%{name}%"))
        if code:
            statement = statement.where(Device.code.like(f"%{code}%"))
        if device_type:
            statement = statement.where(Device.type == device_type)
        return statement

    def create_device(self, device_data: DeviceCreate):
        device = Device.model_validate(device_data)
        device.create_time = datetime.now()
        device.update_time = datetime.now()
        self.db.add(device)
        self.db.commit()
        self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE)
        return device

    def update_device(self, device_data: DeviceUpdate):
        device = self.db.get(Device, device_data.id)
        if not device:
            return None

        update_data = device_data.dict(exclude_unset=True)
        for key, value in update_data.items():
            setattr(device, key, value)

        device.update_time = datetime.now()
        self.db.add(device)
        self.db.commit()
        self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE)
        return device

    def delete_device(self, device_id: int):
        device = self.db.get(Device, device_id)
        if not device:
            return None

        self.db.delete(device)
        self.db.commit()
        self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE)

        relation_service = DeviceModelRelationService(self.db)
        relation_service.delete_relations_by_device(device_id)
        return device

    def get_device(self, device_id: int):
        return self.db.get(Device, device_id)