import asyncio import copy import time from collections import deque from threading import Thread, Lock ''' 队列消息取出规则: - 按 alarmCategory(优先级:2 > 1 > 3 > 0)和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: - 每次取消息时会检查队列中长期堆积、超过一定时间(比如 30 秒)的消息,并清理这些消息。 ''' class AlarmMessageCenter: def __init__(self, device_id, main_loop, tcp_manager=None, message_send_interval=5, category_interval=10, retention_time=30, category_priority=None): self.device_id = device_id self.tcp_manager = tcp_manager self.main_loop = main_loop self.queue = deque() self.last_sent_time = {} # 记录每个 alarmCategory 的最后发送时间 self.lock = Lock() self.message_send_interval = message_send_interval # 消息发送间隔(秒) self.category_interval = category_interval # 类别发送间隔(秒) self.retention_time = retention_time # 消息最长保留时间(秒) if category_priority: self.category_priority = category_priority self.auto_update_alarm_priority = False else: self.category_priority = category_priority self.auto_update_alarm_priority = True def add_message(self, message_ori): message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: self.queue.append(message) # 动态更新优先级映射 if self.auto_update_alarm_priority: alarm_category = message['alarmCategory'] if alarm_category not in self.category_priority: unique_categories = sorted({msg['alarmCategory'] for msg in self.queue}) self.category_priority = {cat: idx for idx, cat in enumerate(unique_categories)} print(f"更新优先级映射: {self.category_priority}") def _clean_old_messages(self): """清理长期堆积的消息""" print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) print(f'清理后的队列长度: {len(self.queue)}') # def _get_next_message(self): # """按优先级和时间规则取出下一条消息""" # now = time.time() # with self.lock: # # 按照规则排序队列 # sorted_queue = sorted( # self.queue, # key=lambda x: ( # self.alarm_priority.get(x['alarmCategory'], 4), # 按 alarmCategory 优先级排序 # x['category_order'], # category_order 小的排前面 # x['timestamp'] # 时间靠前的排前面 # ) # ) # # 遍历排序后的队列,找到符合规则的消息 # for msg in sorted_queue: # alarm_category = msg['alarmCategory'] # if alarm_category not in self.last_sent_time or now - self.last_sent_time[alarm_category] > 10: # self.queue.remove(msg) # self.last_sent_time[alarm_category] = now # return msg # return None def _get_next_message(self): """按优先级和时间规则取出下一条消息""" now = time.time() with self.lock: # 按优先级依次检查 for priority in sorted(self.category_priority.values()): found_valid_message = False # 用于标记当前优先级是否有消息被处理 for msg in sorted( (m for m in self.queue if self.category_priority.get(m['alarmCategory'], 99) == priority), key=lambda x: (-x['timestamp'], x['category_order']) ): alarm_category = msg['alarmCategory'] # 检查是否符合发送条件 if alarm_category not in self.last_sent_time or now - self.last_sent_time[ alarm_category] > self.category_interval: self.queue.remove(msg) self.last_sent_time[alarm_category] = now return msg # 找到符合条件的消息立即返回 found_valid_message = True # 当前优先级存在消息但不符合条件 # 如果当前优先级的所有消息都被检查过且不符合条件,跳到下一个优先级 if not found_valid_message: continue return None # 如果所有优先级都没有消息符合条件,则返回 None def process_messages(self): while True: time.sleep(self.message_send_interval) self._clean_old_messages() # 清理长期堆积的消息 next_message = self._get_next_message() if next_message: self.send_message(next_message) def send_message(self, message): """发送报警消息""" print(f"发送报警消息: {message['alarmContent']} (类别: {message['alarmCategory']}, 时间: {message['timestamp']})") if self.tcp_manager: asyncio.run_coroutine_threadsafe( self.tcp_manager.send_message_to_device(device_id=self.device_id, message=message['alarmSoundMessage'], have_response=False), self.main_loop)