from datetime import datetime from typing import List from sqlmodel import Session, select, delete from common.consts import NotifyChangeType from common.global_thread_pool import GlobalThreadPool from entity.device_model_relation import DeviceModelRelation, DeviceModelRelationInfo, DeviceModelRelationCreate from entity.model import AlgoModel class DeviceModelRelationService: def __init__(self, db: Session): self.db = db self.relation_change_callbacks = [] # 用于存储回调函数 self.thread_pool = GlobalThreadPool() def register_change_callback(self, callback): """注册设备变化回调函数""" self.relation_change_callbacks.append(callback) def notify_change(self, device_id, change_type): """当设备发生变化时,调用回调通知变化""" for callback in self.relation_change_callbacks: self.thread_pool.executor.submit(callback, device_id, change_type) def get_device_models(self, device_id: int) -> List[DeviceModelRelationInfo]: statement = ( select(DeviceModelRelation, AlgoModel) .join(AlgoModel, DeviceModelRelation.algo_model_id == AlgoModel.id) .where(DeviceModelRelation.device_id == device_id) ) # 执行联表查询 result = self.db.exec(statement).all() models_info = [ DeviceModelRelationInfo( id=relation.id, device_id=relation.id, algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, alarm_interval=relation.alarm_interval, alarm_type=relation.alarm_type, algo_model_name=model.name, algo_model_version=model.version, algo_model_path=model.path, algo_model_remark=model.remark, ) for relation, model in result ] return models_info def add_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): new_relations = [ DeviceModelRelation( algo_model_id=relation.algo_model_id, is_use=relation.is_use, threshold=relation.threshold, alarm_interval=relation.alarm_interval, alarm_type=relation.alarm_type, device_id=device_id, # 统一赋值 device_id createtime=datetime.now(), updatetime=datetime.now(), ) for relation in relations ] self.db.add_all(new_relations) self.db.commit() for relation in new_relations: self.db.refresh(relation) return new_relations def delete_relations_by_device(self, device_id: int): statement = delete(DeviceModelRelation).where(DeviceModelRelation.device_id == device_id) count = self.db.exec(statement) self.db.commit() return count.rowcount def update_relations_by_device(self, device_id: int, relations: List[DeviceModelRelationCreate]): self.delete_relations_by_device(device_id) new_relations = self.add_relations_by_device(device_id, relations) self.notify_change(device_id, NotifyChangeType.DEVICE_MODEL_RELATION_UPDATE) return new_relations