import asyncio import threading from datetime import datetime from typing import Sequence, Optional, Tuple from sqlalchemy import func from sqlmodel import select, delete from sqlalchemy.ext.asyncio import AsyncSession from common.device_status_manager import DeviceStatusManager from common.global_thread_pool import GlobalThreadPool from common.consts import NotifyChangeType, DEVICE_MODE from entity.device import Device, DeviceCreate, DeviceUpdate, DeviceInfo from services.device_model_relation_service import DeviceModelRelationService from services.device_scene_relation_service import DeviceSceneRelationService class DeviceService: def __init__(self, db: AsyncSession): self.db = db self.__device_change_callbacks = [] # 用于存储回调函数 self.thread_pool = GlobalThreadPool() # 创建一个独立的事件循环并启动线程 # self.loop = asyncio.new_event_loop() # self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) # self.loop_thread.start() # def _start_loop(self): # """后台线程运行事件循环""" # asyncio.set_event_loop(self.loop) # self.loop.run_forever() # # def shutdown(self): # """清理事件循环和线程""" # self.loop.call_soon_threadsafe(self.loop.stop) # self.loop_thread.join() def register_change_callback(self, callback): """注册设备变化回调函数""" self.__device_change_callbacks.append(callback) def notify_change(self, device_id, change_type): """当设备发生变化时,调用回调通知变化""" # loop = asyncio.get_event_loop() # 获取当前的事件循环 # for callback in self.__device_change_callbacks: # if asyncio.iscoroutinefunction(callback): # # 如果是协程函数,使用事件循环运行它 # # loop = asyncio.new_event_loop() # # asyncio.set_event_loop(loop) # asyncio.run_coroutine_threadsafe(callback(device_id, change_type), self.loop) # else: # # 如果是普通函数,直接提交到线程池 # self.thread_pool.executor.submit(callback, device_id, change_type) async def get_device_list(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, ) -> Sequence[Device]: statement = self.device_query(code, device_type, name) results = await self.db.execute(statement) return results.scalars().all() async def get_device_page(self, name: Optional[str] = None, code: Optional[str] = None, device_type: Optional[str] = None, offset: int = 0, limit: int = 10 ) -> Tuple[Sequence[DeviceInfo], int]: statement = self.device_query(code, device_type, name) # 查询总记录数 total_statement = select(func.count()).select_from(statement.subquery()) total_result = await self.db.execute(total_statement) total = total_result.scalar_one() # 添加分页限制 statement = statement.offset(offset).limit(limit) # 执行查询并返回结果 results = await self.db.execute(statement) device_list = results.scalars().all() device_info_list = [] if device_list: device_model_relation_service = DeviceModelRelationService(self.db) device_scene_relation_service = DeviceSceneRelationService(self.db) device_status_manager = DeviceStatusManager() for device in device_list: model_relations = await device_model_relation_service.get_device_models(device.id) scene_relation = await device_scene_relation_service.get_device_scene(device.id) device_info_list.append(DeviceInfo( id=device.id, name=device.name, code=device.code, type=device.type, ip=device.ip, gas_ip=device.gas_ip, mode=device.mode, input_stream_url=device.input_stream_url, output_stream_url=device.output_stream_url, image_save_interval=device.image_save_interval, alarm_interval=device.alarm_interval, status="在线" if device_status_manager.get_status(device.id) else "离线", relation_model_names=", ".join( [relation.algo_model_name for relation in model_relations if relation.is_use == 1] ) if model_relations else "—", relation_scene_name=scene_relation.scene_name if scene_relation else "—" )) return device_info_list, total # 返回分页数据和总数 def device_query(self, code, device_type, name): # 构建查询语句 statement = select(Device) if name: statement = statement.where(Device.name.like(f"%{name}%")) if code: statement = statement.where(Device.code.like(f"%{code}%")) if device_type: statement = statement.where(Device.type == device_type) return statement async def create_device(self, device_data: DeviceCreate): device = Device.model_validate(device_data) device.create_time = datetime.now() device.update_time = datetime.now() await self.handle_device_mode(device) self.db.add(device) await self.db.commit() await self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE) return device async def update_device(self, device_data: DeviceUpdate): device_old = await self.get_device(device_data.id) device = await self.get_device(device_data.id) if not device: return None update_data = device_data.dict(exclude_unset=True) for key, value in update_data.items(): setattr(device, key, value) await self.handle_device_mode(device) device.update_time = datetime.now() self.db.add(device) await self.db.commit() await self.db.refresh(device) self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE) return device async def delete_device(self, device_id: int): device = await self.get_device(device_id) if not device: return None statement = delete(Device).where(Device.id == device_id) await self.db.execute(statement) await self.db.commit() self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE) model_relation_service = DeviceModelRelationService(self.db) await model_relation_service.delete_relations_by_device(device_id) scene_relation_service = DeviceSceneRelationService(self.db) await scene_relation_service.delete_relation_by_device(device.id) return device async def handle_device_mode(self, device): if device.mode == DEVICE_MODE.ALGO: scene_relation_service = DeviceSceneRelationService(self.db) await scene_relation_service.delete_relation_by_device(device.id) elif device.mode == DEVICE_MODE.SCENE: model_relation_service = DeviceModelRelationService(self.db) await model_relation_service.delete_relations_by_device(device.id) else: scene_relation_service = DeviceSceneRelationService(self.db) await scene_relation_service.delete_relation_by_device(device.id) model_relation_service = DeviceModelRelationService(self.db) await model_relation_service.delete_relations_by_device(device.id) async def get_device(self, device_id: int): result = await self.db.execute(select(Device).where(Device.id == device_id)) frame = result.scalar_one_or_none() return frame