Newer
Older
safe-algo-pro / services / device_service.py
zhangyingjie on 2 Nov 4 KB 增加甲烷查询任务及接口
import asyncio
import threading
from datetime import datetime
from typing import Sequence, Optional, Tuple

from sqlalchemy import func
from sqlmodel import Session, select

from common.global_thread_pool import GlobalThreadPool
from common.consts import NotifyChangeType
from entity.device import Device, DeviceCreate, DeviceUpdate
from services.device_model_relation_service import DeviceModelRelationService


class DeviceService:
    def __init__(self, db: Session):
        self.db = db
        self.__device_change_callbacks = []  # 用于存储回调函数
        self.thread_pool = GlobalThreadPool()

        # 创建一个独立的事件循环并启动线程
        self.loop = asyncio.new_event_loop()
        self.loop_thread = threading.Thread(target=self._start_loop, daemon=True)
        self.loop_thread.start()

    def _start_loop(self):
        """后台线程运行事件循环"""
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def shutdown(self):
        """清理事件循环和线程"""
        self.loop.call_soon_threadsafe(self.loop.stop)
        self.loop_thread.join()

    def register_change_callback(self, callback):
        """注册设备变化回调函数"""
        self.__device_change_callbacks.append(callback)

    def notify_change(self, device_id, change_type):
        """当设备发生变化时,调用回调通知变化"""
        # loop = asyncio.get_event_loop()  # 获取当前的事件循环
        for callback in self.__device_change_callbacks:
            if asyncio.iscoroutinefunction(callback):
                # 如果是协程函数,使用事件循环运行它
                # loop = asyncio.new_event_loop()
                # asyncio.set_event_loop(loop)
                asyncio.run_coroutine_threadsafe(callback(device_id, change_type), self.loop)
            else:
                # 如果是普通函数,直接提交到线程池
                self.thread_pool.executor.submit(callback, device_id, change_type)

    def get_device_list(self,
                        name: Optional[str] = None,
                        code: Optional[str] = None,
                        device_type: Optional[str] = None,
                        ) -> Sequence[Device]:
        statement = self.device_query(code, device_type, name)
        results = self.db.exec(statement)
        return results.all()

    def get_device_page(self,
                        name: Optional[str] = None,
                        code: Optional[str] = None,
                        device_type: Optional[str] = None,
                        offset: int = 0,
                        limit: int = 10
                        ) -> Tuple[Sequence[Device], int]:
        statement = self.device_query(code, device_type, name)

        # 查询总记录数
        total_statement = select(func.count()).select_from(statement.subquery())
        total = self.db.exec(total_statement).one()

        # 添加分页限制
        statement = statement.offset(offset).limit(limit)

        # 执行查询并返回结果
        results = self.db.exec(statement)
        return results.all(), total  # 返回分页数据和总数

    def device_query(self, code, device_type, name):
        # 构建查询语句
        statement = select(Device)
        if name:
            statement = statement.where(Device.name.like(f"%{name}%"))
        if code:
            statement = statement.where(Device.code.like(f"%{code}%"))
        if device_type:
            statement = statement.where(Device.type == device_type)
        return statement

    def create_device(self, device_data: DeviceCreate):
        device = Device.model_validate(device_data)
        device.create_time = datetime.now()
        device.update_time = datetime.now()
        self.db.add(device)
        self.db.commit()
        self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_CREATE)
        return device

    def update_device(self, device_data: DeviceUpdate):
        device_old = self.db.get(Device, device_data.id)
        device = self.db.get(Device, device_data.id)
        if not device:
            return None

        update_data = device_data.dict(exclude_unset=True)
        for key, value in update_data.items():
            setattr(device, key, value)

        device.update_time = datetime.now()
        self.db.add(device)
        self.db.commit()
        self.db.refresh(device)
        self.notify_change(device.id, NotifyChangeType.DEVICE_UPDATE)
        return device

    def delete_device(self, device_id: int):
        device = self.db.get(Device, device_id)
        if not device:
            return None

        self.db.delete(device)
        self.db.commit()
        self.notify_change(device.id, NotifyChangeType.DEVICE_DELETE)

        relation_service = DeviceModelRelationService(self.db)
        relation_service.delete_relations_by_device(device_id)
        return device

    def get_device(self, device_id: int):
        return self.db.get(Device, device_id)