Newer
Older
safe-algo-pro / services / device_scene_relation_service.py
from datetime import datetime
from typing import List, Optional

from sqlmodel import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from common.consts import NotifyChangeType
from common.global_thread_pool import GlobalThreadPool
from entity.device_scene_relation import DeviceSceneRelationInfo, DeviceSceneRelation
from entity.scene import Scene


class DeviceSceneRelationService:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.__relation_change_callbacks = []  # 用于存储回调函数
        self.thread_pool = GlobalThreadPool()

    def register_change_callback(self, callback):
        """注册设备变化回调函数"""
        self.__relation_change_callbacks.append(callback)

    def notify_change(self, device_id, change_type):
        """当设备发生变化时,调用回调通知变化"""
        for callback in self.__relation_change_callbacks:
            self.thread_pool.executor.submit(callback, device_id, change_type)

    async def get_device_scene(self, device_id: int) -> Optional[DeviceSceneRelationInfo]:
        statement = (
            select(DeviceSceneRelation, Scene)
            .join(Scene, DeviceSceneRelation.scene_id == Scene.id)
            .where(DeviceSceneRelation.device_id == device_id)
        )

        # 执行联表查询
        result = await self.db.execute(statement)
        result_row = result.first()

        scene_info = None
        if result_row:
            relation, scene = result_row
            scene_info = DeviceSceneRelationInfo(
                id=relation.id,
                device_id=relation.device_id,
                scene_id=relation.scene_id,
                scene_name=scene.name,
                scene_version=scene.version,
                scene_handle_task=scene.handle_task,
                scene_remark=scene.remark,
                range_points=relation.range_points
            )
        return scene_info

    async def add_relation_by_device(self, device_id: int, scene_id: int, range_points: str = None):
        new_relation = DeviceSceneRelation(device_id=device_id, scene_id=scene_id, range_points=range_points)
        new_relation.create_time = datetime.now()
        new_relation.update_time = datetime.now()
        self.db.add(new_relation)
        await self.db.commit()
        await self.db.refresh(new_relation)
        return new_relation

    async def delete_relation_by_device(self, device_id: int):
        statement = delete(DeviceSceneRelation).where(DeviceSceneRelation.device_id == device_id)
        result = await self.db.execute(statement)
        await self.db.commit()
        return result.rowcount

    async def update_relation_by_device(self, device_id: int, scene_id: int, range_points: str = None):
        await self.delete_relation_by_device(device_id)
        new_relation = await self.add_relation_by_device(device_id, scene_id, range_points)
        self.notify_change(device_id, NotifyChangeType.DEVICE_SCENE_RELATION_UPDATE)
        return new_relation