from datetime import datetime from typing import Sequence, Optional, Tuple from sqlalchemy import func from sqlmodel import Session, select from common.global_thread_pool import GlobalThreadPool from common.consts import NotifyChangeType from entity.device import Device, DeviceCreate, DeviceUpdate from services.device_model_relation_service import DeviceModelRelationService class DeviceService: def __init__(self, db: Session): self.db = db self.device_change_callbacks = [] # 用于存储回调函数 self.thread_pool = GlobalThreadPool() def register_change_callback(self, callback): """注册设备变化回调函数""" self.device_change_callbacks.append(callback) def notify_change(self, device_id, change_type): """当设备发生变化时,调用回调通知变化""" for callback in self.device_change_callbacks: self.thread_pool.executor.submit(callback, device_id, change_type) def get_device_list(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, ) -> Sequence[Device]: statement = self.device_query(code, device_type, name) results = self.db.exec(statement) return results.all() def get_device_page(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, offset: int = 0, limit: int = 10 ) -> Tuple[Sequence[Device], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 total_statement = select(func.count()).select_from(statement.subquery()) total = self.db.exec(total_statement).one() # 添加分页限制 statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 results = self.db.exec(statement) return results.all(), total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 statement = select(Device) if name: statement = statement.where(Device.name.like(f"%{name}%")) if code: statement = statement.where(Device.code.like(f"%{code}%")) if device_type: statement = statement.where(Device.type == device_type) return statement def create_device(self, device_data: DeviceCreate): device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() self.db.add(device) self.db.commit() self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE) return device def update_device(self, device_data: DeviceUpdate): device = self.db.get(Device, device_data.id) if not device: return None update_data = device_data.dict(exclude_unset=True) for key, value in update_data.items(): setattr(device, key, value) device.update_time = datetime.now() self.db.add(device) self.db.commit() self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE) return device def delete_device(self, device_id: int): device = self.db.get(Device, device_id) if not device: return None self.db.delete(device) self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) relation_service = DeviceModelRelationService(self.db) relation_service.delete_relations_by_device(device_id) return device def get_device(self, device_id: int): return self.db.get(Device, device_id)