Newer
Older
safe-algo-pro / services / push_config_service.py
zhangyingjie on 7 Nov 1 KB 增加数据推送功能
from datetime import datetime

from sqlmodel import Session, select

from entity.push_config import PushConfigCreate, PushConfig


class PushConfigService:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, db: Session):
        if not hasattr(self, 'initialized'):
            self.db = db
            self.__push_change_callbacks = []  # 用于存储回调函数
            self.initialized = True

    def register_change_callback(self, callback):
        """注册设备变化回调函数"""
        self.__push_change_callbacks.append(callback)

    def notify_change(self, push_config):
        for callback in self.__push_change_callbacks:
            callback(push_config)

    def set_push_config(self, push_config_create: PushConfigCreate):
        push_config = self.get_push_config(push_config_create.push_type)
        if push_config:
            update_data = push_config_create.dict(exclude_unset=True)
            for key, value in update_data.items():
                setattr(push_config, key, value)
            push_config.update_time = datetime.now()
        else:
            push_config = PushConfig.model_validate(push_config_create)
            push_config.create_time = datetime.now()
            push_config.update_time = datetime.now()

        self.db.add(push_config)
        self.db.commit()
        self.db.refresh(push_config)

        self.notify_change(push_config)
        return push_config

    def get_push_config(self, push_type):
        statement = select(PushConfig).where(PushConfig.push_type == push_type)
        results = self.db.exec(statement)
        return results.first()

    def get_push_config_list(self):
        statement = select(PushConfig)
        results = self.db.exec(statement)
        return results.all()