import asyncio import copy import time from collections import deque from threading import Thread, Lock from common.global_logger import logger from common.ipc_utils import send_to_tcp ''' 队列消息取出规则: - 按 alarmCategory和 category_order 从小到大排序。 - 确保每个 alarmCategory 在 10 秒内只能发送一次。 消息定时清理: - 每次取消息时会检查队列中长期堆积、超过一定时间(比如 30 秒)的消息,并清理这些消息。 ''' class AlarmMessageCenter: def __init__(self, device_id, main_loop, tcp_manager=None, message_send_interval=5, category_interval=10, retention_time=30, category_priority=None): self.device_id = device_id self.tcp_manager = tcp_manager self.main_loop = main_loop self.queue = deque() self.last_sent_time = {} # 记录每个 alarmCategory 的最后发送时间 self.lock = Lock() self.message_send_interval = message_send_interval # 消息发送间隔(秒) self.category_interval = category_interval # 类别发送间隔(秒) self.retention_time = retention_time # 消息最长保留时间(秒) if category_priority: self.category_priority = category_priority self.auto_update_alarm_priority = False else: self.category_priority = category_priority self.auto_update_alarm_priority = True def add_message(self, message_ori): message = copy.deepcopy(message_ori) message['timestamp'] = int(time.time()) # 添加消息放入队列的时间 with self.lock: # print(message) self.queue.append(message) # 动态更新优先级映射 if self.auto_update_alarm_priority: alarm_category = message['alarmCategory'] if alarm_category not in self.category_priority: unique_categories = sorted({msg['alarmCategory'] for msg in self.queue}) self.category_priority = {cat: idx for idx, cat in enumerate(unique_categories)} print(f"更新优先级映射: {self.category_priority}") def _clean_old_messages(self): """清理长期堆积的消息""" # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) logger.debug(f'清理后的队列长度: {len(self.queue)}') # def _get_next_message(self): # """按优先级和时间规则取出下一条消息""" # now = time.time() # with self.lock: # # 按照规则排序队列 # sorted_queue = sorted( # self.queue, # key=lambda x: ( # self.alarm_priority.get(x['alarmCategory'], 4), # 按 alarmCategory 优先级排序 # x['category_order'], # category_order 小的排前面 # x['timestamp'] # 时间靠前的排前面 # ) # ) # # 遍历排序后的队列,找到符合规则的消息 # for msg in sorted_queue: # alarm_category = msg['alarmCategory'] # if alarm_category not in self.last_sent_time or now - self.last_sent_time[alarm_category] > 10: # self.queue.remove(msg) # self.last_sent_time[alarm_category] = now # return msg # return None def _get_next_message(self): """按优先级和时间规则取出下一条消息""" now = time.time() with self.lock: # 按优先级依次检查 for priority in sorted(self.category_priority.values()): found_valid_message = False # 用于标记当前优先级是否有消息被处理 for msg in sorted( (m for m in self.queue if self.category_priority.get(m['alarmCategory'], 99) == priority), key=lambda x: (-x['timestamp'], x['category_order']) ): alarm_category = msg['alarmCategory'] # 检查是否符合发送条件 if alarm_category not in self.last_sent_time or now - self.last_sent_time[ alarm_category] > self.category_interval: self.queue.remove(msg) self.last_sent_time[alarm_category] = now return msg # 找到符合条件的消息立即返回 found_valid_message = True # 当前优先级存在消息但不符合条件 # 如果当前优先级的所有消息都被检查过且不符合条件,跳到下一个优先级 if not found_valid_message: continue return None # 如果所有优先级都没有消息符合条件,则返回 None def process_messages(self): while True: time.sleep(self.message_send_interval) self._clean_old_messages() # 清理长期堆积的消息 next_message = self._get_next_message() if next_message: self.send_message(next_message) def send_message(self, message): """发送报警消息""" print(f"发送报警消息: {message['alarmContent']} (类别: {message['alarmCategory']}, 时间: {message['timestamp']})") send_to_tcp(self.device_id, message['alarmSoundMessage']) # if self.tcp_manager: # asyncio.run_coroutine_threadsafe( # self.tcp_manager.send_message_to_device(device_id=self.device_id, # message=message['alarmSoundMessage'], # have_response=False), # self.main_loop) def send_immediate_command(self, command): """ 立即发送指定的指令,不经过消息队列。 参数: command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 """ print(f"立即发送指令: {command}") if self.tcp_manager: # 直接调用 tcp_manager 的发送方法,将指令传递过去 asyncio.run_coroutine_threadsafe( self.tcp_manager.send_message_to_device( device_id=self.device_id, message=command, have_response=False ), self.main_loop ) def delete_messages(self, condition): """ 删除队列中符合条件的报警消息。 参数: condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 例如:lambda msg: msg['alarmCategory'] == 1 """ with self.lock: original_length = len(self.queue) # 过滤掉满足 condition 的消息,保留其余消息 self.queue = deque([msg for msg in self.queue if not condition(msg)]) print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。")