Newer
Older
safe-algo-pro / services / device_service.py
import asyncio
import threading
from datetime import datetime
from typing import Sequence, Optional, Tuple

from sqlalchemy import func
from sqlmodel import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from common.device_status_manager import DeviceStatusManager
from common.global_thread_pool import GlobalThreadPool
from common.consts import NotifyChangeType, DEVICE_MODE
from entity.device import Device, DeviceCreate, DeviceUpdate, DeviceInfo
from services.device_model_relation_service import DeviceModelRelationService
from services.device_scene_relation_service import DeviceSceneRelationService


class DeviceService:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.__device_change_callbacks = []  # 用于存储回调函数
        self.thread_pool = GlobalThreadPool()

        # 创建一个独立的事件循环并启动线程
        # self.loop = asyncio.new_event_loop()
        # self.loop_thread = threading.Thread(target=self._start_loop, daemon=True)
        # self.loop_thread.start()

    # def _start_loop(self):
    #     """后台线程运行事件循环"""
    #     asyncio.set_event_loop(self.loop)
    #     self.loop.run_forever()
    #
    # def shutdown(self):
    #     """清理事件循环和线程"""
    #     self.loop.call_soon_threadsafe(self.loop.stop)
    #     self.loop_thread.join()

    def register_change_callback(self, callback):
        """注册设备变化回调函数"""
        self.__device_change_callbacks.append(callback)

    def notify_change(self, device_id, change_type):
        """当设备发生变化时,调用回调通知变化"""
        # loop = asyncio.get_event_loop()  # 获取当前的事件循环
        # for callback in self.__device_change_callbacks:
        #     if asyncio.iscoroutinefunction(callback):
        #         # 如果是协程函数,使用事件循环运行它
        #         # loop = asyncio.new_event_loop()
        #         # asyncio.set_event_loop(loop)
        #         asyncio.run_coroutine_threadsafe(callback(device_id, change_type), self.loop)
        #     else:
        #         # 如果是普通函数,直接提交到线程池
        #         self.thread_pool.executor.submit(callback, device_id, change_type)

    async def get_device_list(self,
                              name: Optional[str] = None,
                              code: Optional[str] = None,
                              device_type: Optional[str] = None,
                              ) -> Sequence[Device]:
        statement = self.device_query(code, device_type, name)
        results = await self.db.execute(statement)
        return results.scalars().all()

    async def get_device_page(self,
                              name: Optional[str] = None,
                              code: Optional[str] = None,
                              device_type: Optional[str] = None,
                              offset: int = 0,
                              limit: int = 10
                              ) -> Tuple[Sequence[DeviceInfo], int]:
        statement = self.device_query(code, device_type, name)

        # 查询总记录数
        total_statement = select(func.count()).select_from(statement.subquery())
        total_result = await self.db.execute(total_statement)
        total = total_result.scalar_one()

        # 添加分页限制
        statement = statement.offset(offset).limit(limit)

        # 执行查询并返回结果
        results = await self.db.execute(statement)
        device_list = results.scalars().all()
        device_info_list = []
        if device_list:
            device_model_relation_service = DeviceModelRelationService(self.db)
            device_scene_relation_service = DeviceSceneRelationService(self.db)
            device_status_manager = DeviceStatusManager()
            for device in device_list:
                model_relations = await device_model_relation_service.get_device_models(device.id)
                scene_relation = await device_scene_relation_service.get_device_scene(device.id)

                device_info_list.append(DeviceInfo(
                    id=device.id,
                    name=device.name,
                    code=device.code,
                    type=device.type,
                    ip=device.ip,
                    gas_ip=device.gas_ip,
                    mode=device.mode,
                    input_stream_url=device.input_stream_url,
                    output_stream_url=device.output_stream_url,
                    image_save_interval=device.image_save_interval,
                    alarm_interval=device.alarm_interval,

                    status="在线" if device_status_manager.get_status(device.id) else "离线",
                    relation_model_names=", ".join(
                        [relation.algo_model_name for relation in model_relations if relation.is_use == 1]
                    ) if model_relations else "—",
                    relation_scene_name=scene_relation.scene_name if scene_relation else "—"
                ))
        return device_info_list, total  # 返回分页数据和总数

    def device_query(self, code, device_type, name):
        # 构建查询语句
        statement = select(Device)
        if name:
            statement = statement.where(Device.name.like(f"%{name}%"))
        if code:
            statement = statement.where(Device.code.like(f"%{code}%"))
        if device_type:
            statement = statement.where(Device.type == device_type)
        return statement

    async def create_device(self, device_data: DeviceCreate):
        device = Device.model_validate(device_data)
        device.create_time = datetime.now()
        device.update_time = datetime.now()
        await self.handle_device_mode(device)
        self.db.add(device)
        await self.db.commit()
        await self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE)
        return device

    async def update_device(self, device_data: DeviceUpdate):
        device_old = await self.get_device(device_data.id)
        device = await self.get_device(device_data.id)
        if not device:
            return None

        update_data = device_data.dict(exclude_unset=True)
        for key, value in update_data.items():
            setattr(device, key, value)

        await self.handle_device_mode(device)
        device.update_time = datetime.now()
        self.db.add(device)
        await self.db.commit()
        await self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE)
        return device

    async def delete_device(self, device_id: int):
        device = await self.get_device(device_id)
        if not device:
            return None

        statement = delete(Device).where(Device.id == device_id)
        await self.db.execute(statement)
        await self.db.commit()

        self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE)

        model_relation_service = DeviceModelRelationService(self.db)
        await model_relation_service.delete_relations_by_device(device_id)
        scene_relation_service = DeviceSceneRelationService(self.db)
        await scene_relation_service.delete_relation_by_device(device.id)
        return device

    async def handle_device_mode(self, device):
        if device.mode == DEVICE_MODE.ALGO:
            scene_relation_service = DeviceSceneRelationService(self.db)
            await scene_relation_service.delete_relation_by_device(device.id)
        elif device.mode == DEVICE_MODE.SCENE:
            model_relation_service = DeviceModelRelationService(self.db)
            await model_relation_service.delete_relations_by_device(device.id)
        else:
            scene_relation_service = DeviceSceneRelationService(self.db)
            await scene_relation_service.delete_relation_by_device(device.id)
            model_relation_service = DeviceModelRelationService(self.db)
            await model_relation_service.delete_relations_by_device(device.id)

    async def get_device(self, device_id: int):
        result = await self.db.execute(select(Device).where(Device.id == device_id))
        frame = result.scalar_one_or_none()
        return frame