Newer
Older
safe-algo-pro / scene_handler / alarm_message_center.py
import asyncio
import copy
import time
from collections import deque
from threading import Thread, Lock

'''
队列消息取出规则:
- 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。
- 确保每个 alarmCategory 在 10 秒内只能发送一次。

消息定时清理:
- 每次取消息时会检查队列中长期堆积、超过一定时间(比如 30 秒)的消息,并清理这些消息。
'''


class AlarmMessageCenter:
    def __init__(self, device_id, main_loop, tcp_manager=None, message_send_interval=5, category_interval=10, retention_time=30,
                 category_priority=None):
        self.device_id = device_id
        self.tcp_manager = tcp_manager
        self.main_loop = main_loop

        self.queue = deque()
        self.last_sent_time = {}  # 记录每个 alarmCategory 的最后发送时间
        self.lock = Lock()
        self.message_send_interval = message_send_interval  # 消息发送间隔(秒)
        self.category_interval = category_interval  # 类别发送间隔(秒)
        self.retention_time = retention_time  # 消息最长保留时间(秒)
        if category_priority:
            self.category_priority = category_priority
            self.auto_update_alarm_priority = False
        else:
            self.category_priority = category_priority
            self.auto_update_alarm_priority = True

    def add_message(self, message_ori):
        message = copy.deepcopy(message_ori)
        message['timestamp'] = int(time.time())  # 添加消息放入队列的时间
        with self.lock:
            self.queue.append(message)

        # 动态更新优先级映射
        if self.auto_update_alarm_priority:
            alarm_category = message['alarmCategory']
            if alarm_category not in self.category_priority:
                unique_categories = sorted({msg['alarmCategory'] for msg in self.queue})
                self.category_priority = {cat: idx for idx, cat in enumerate(unique_categories)}
                print(f"更新优先级映射: {self.category_priority}")

    def _clean_old_messages(self):
        """清理长期堆积的消息"""
        print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})')
        now = time.time()
        with self.lock:
            self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time])
        print(f'清理后的队列长度: {len(self.queue)}')

    # def _get_next_message(self):
    #     """按优先级和时间规则取出下一条消息"""
    #     now = time.time()
    #     with self.lock:
    #         # 按照规则排序队列
    #         sorted_queue = sorted(
    #             self.queue,
    #             key=lambda x: (
    #                 self.alarm_priority.get(x['alarmCategory'], 4),  # 按 alarmCategory 优先级排序
    #                 x['category_order'],  # category_order 小的排前面
    #                 x['timestamp']        # 时间靠前的排前面
    #             )
    #         )
    #         # 遍历排序后的队列,找到符合规则的消息
    #         for msg in sorted_queue:
    #             alarm_category = msg['alarmCategory']
    #             if alarm_category not in self.last_sent_time or now - self.last_sent_time[alarm_category] > 10:
    #                 self.queue.remove(msg)
    #                 self.last_sent_time[alarm_category] = now
    #                 return msg
    #     return None

    def _get_next_message(self):
        """按优先级和时间规则取出下一条消息"""
        now = time.time()
        with self.lock:
            # 按优先级依次检查
            for priority in sorted(self.category_priority.values()):
                found_valid_message = False  # 用于标记当前优先级是否有消息被处理
                for msg in sorted(
                        (m for m in self.queue if self.category_priority.get(m['alarmCategory'], 99) == priority),
                        key=lambda x: (-x['timestamp'], x['category_order'])
                ):
                    alarm_category = msg['alarmCategory']
                    # 检查是否符合发送条件
                    if alarm_category not in self.last_sent_time or now - self.last_sent_time[
                        alarm_category] > self.category_interval:
                        self.queue.remove(msg)
                        self.last_sent_time[alarm_category] = now
                        return msg  # 找到符合条件的消息立即返回
                    found_valid_message = True  # 当前优先级存在消息但不符合条件
                # 如果当前优先级的所有消息都被检查过且不符合条件,跳到下一个优先级
                if not found_valid_message:
                    continue
            return None  # 如果所有优先级都没有消息符合条件,则返回 None

    def process_messages(self):
        while True:
            time.sleep(self.message_send_interval)
            self._clean_old_messages()  # 清理长期堆积的消息
            next_message = self._get_next_message()
            if next_message:
                self.send_message(next_message)

    def send_message(self, message):
        """发送报警消息"""
        print(f"发送报警消息: {message['alarmContent']} (类别: {message['alarmCategory']}, 时间: {message['timestamp']})")
        if self.tcp_manager:
            asyncio.run_coroutine_threadsafe(
                self.tcp_manager.send_message_to_device(device_id=self.device_id,
                                                        message=message['alarmSoundMessage'],
                                                        have_response=False),
                self.main_loop)