import asyncio import threading from datetime import datetime from typing import Sequence, Optional, Tuple from sqlalchemy import func from sqlmodel import Session, select from common.global_thread_pool import GlobalThreadPool from common.consts import NotifyChangeType from entity.device import Device, DeviceCreate, DeviceUpdate from services.device_model_relation_service import DeviceModelRelationService class DeviceService: def __init__(self, db: Session): self.db = db self.__device_change_callbacks = [] # 用于存储回调函数 self.thread_pool = GlobalThreadPool() # 创建一个独立的事件循环并启动线程 self.loop = asyncio.new_event_loop() self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) self.loop_thread.start() def _start_loop(self): """后台线程运行事件循环""" asyncio.set_event_loop(self.loop) self.loop.run_forever() def shutdown(self): """清理事件循环和线程""" self.loop.call_soon_threadsafe(self.loop.stop) self.loop_thread.join() def register_change_callback(self, callback): """注册设备变化回调函数""" self.__device_change_callbacks.append(callback) def notify_change(self, device_id, change_type): """当设备发生变化时,调用回调通知变化""" # loop = asyncio.get_event_loop() # 获取当前的事件循环 for callback in self.__device_change_callbacks: if asyncio.iscoroutinefunction(callback): # 如果是协程函数,使用事件循环运行它 # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) asyncio.run_coroutine_threadsafe(callback(device_id, change_type), self.loop) else: # 如果是普通函数,直接提交到线程池 self.thread_pool.executor.submit(callback, device_id, change_type) def get_device_list(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, ) -> Sequence[Device]: statement = self.device_query(code, device_type, name) results = self.db.exec(statement) return results.all() def get_device_page(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, offset: int = 0, limit: int = 10 ) -> Tuple[Sequence[Device], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 total_statement = select(func.count()).select_from(statement.subquery()) total = self.db.exec(total_statement).one() # 添加分页限制 statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 results = self.db.exec(statement) return results.all(), total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 statement = select(Device) if name: statement = statement.where(Device.name.like(f"%{name}%")) if code: statement = statement.where(Device.code.like(f"%{code}%")) if device_type: statement = statement.where(Device.type == device_type) return statement def create_device(self, device_data: DeviceCreate): device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() self.db.add(device) self.db.commit() self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE) return device def update_device(self, device_data: DeviceUpdate): device_old = self.db.get(Device, device_data.id) device = self.db.get(Device, device_data.id) if not device: return None update_data = device_data.dict(exclude_unset=True) for key, value in update_data.items(): setattr(device, key, value) device.update_time = datetime.now() self.db.add(device) self.db.commit() self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE) return device def delete_device(self, device_id: int): device = self.db.get(Device, device_id) if not device: return None self.db.delete(device) self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) relation_service = DeviceModelRelationService(self.db) relation_service.delete_relations_by_device(device_id) return device def get_device(self, device_id: int): return self.db.get(Device, device_id)