import asyncio import struct import traceback import json import base64 import re from datetime import datetime from typing import Dict from common.harmful_gas_manager import HarmfulGasManager from common.http_utils import send_request_async from common.global_logger import logger from scene_handler.alarm_record_center import AlarmRecordCenter from services.global_config import GlobalConfig # 有害气体数据解析器 class HarmfulGasHandler: """有害气体设备数据解析器""" gas_units = { 0: "%LEL", 1: "%VOL", 2: "PPM", 3: "umol/mol", 4: "mg/m3", 5: "ug/m3", 6: "℃", 7: "%" } decimals = { 0: "没有小数点", 1: "有一位小数", 2: "有两位小数", 3: "有三位小数" } gas_statuses = { 0: "预热", 1: "正常", 3: "传感器故障", 5: "低限报警", 6: "高限报警" } gas_types = { 3: "硫化氢 (H2S)", 4: "一氧化碳 (CO)", 5: "氧气 (O2)", 50: "可燃气体 (Ex)", } alarm_dict = { 3: { 'alarmType': '5', 'alarmContent': '硫化氢浓度过高', }, 4: { 'alarmType': '7', 'alarmContent': '一氧化碳浓度过高', }, 5: { 'alarmType': '4', 'alarmContent': '氧气浓度过低', }, 50: { 'alarmType': '8', 'alarmContent': '可燃气体浓度过高', } } def __init__(self, main_loop=None): self._harmful_gas_manager = HarmfulGasManager() self._push_ts_dict = {} self.alarm_record_center = AlarmRecordCenter(main_loop=main_loop) async def parse(self, message) -> Dict: message = message.replace('\r\n', '\n').replace('\r', '\n') """解析有害气体设备数据""" harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" match = re.match(harmful_gas_pattern, message, re.DOTALL) if not match: print('有害气体浓度解析异常') return {} device_code = match.group(1) # 设备号 sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 try: sensor_data_json = json.loads(sensor_data_str) sensor_data = sensor_data_json.get('sensorDatas', []) if sensor_data: await self._push_data(device_code, message) for data_item in sensor_data: gas_type, gas_data = self._parse_sensor_item(device_code, data_item) # 判断异常并上报 if gas_type is not None and gas_data is not None: if self._is_data_alarm(device_code, gas_type, gas_data): print(f"四合一浓度报警: {device_code}, {gas_type}, {gas_data}") self._save_and_send_alarm(device_code, gas_type, gas_data) print(f'更新四合一{device_code}数据 {self._harmful_gas_manager.get_device_all_data(device_code)}') except json.JSONDecodeError: logger.error(f"JSON解析错误: {message}") return {} except Exception as e: logger.error(f"解析有害气体数据时出错: {e}") logger.error(traceback.format_exc()) return {} def _parse_sensor_item(self, device_code: str, data: Dict): """解析单个传感器数据项""" try: flag = data.get("flag") gas_value = data.get("gas_value") gas_dec = data.get("gas_dec") gas_status = data.get("gas_status") gas_type = data.get("gas_type") gas_unit = data.get("gas_unit") # 获取单位、精度、状态和气体类型的描述 unit = self.gas_units.get(gas_unit, "未知单位") precision = self.decimals.get(gas_dec, "未知精度") status = self.gas_statuses.get(gas_status, "未知状态") gas_type_name = self.gas_types.get(gas_type, "未知气体") # 格式化气体浓度(根据精度进行转换) gas_value = self._handle_precision(gas_value, gas_dec) gas_data = { "flag": flag, "gas_value": gas_value, "gas_unit": unit, "gas_status": status, "gas_type": gas_type_name, "gas_type_code": gas_type, "precision": precision, 'gas_ts': datetime.now() } self._harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) return gas_type, gas_data except Exception as e: logger.error(f"解析传感器数据项时出错: {e}") return None def _is_data_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> bool: """处理报警""" gas_value = gas_data['gas_value'] if gas_type == 3: return gas_value > 10 elif gas_type == 4: return gas_value > 30 elif gas_type == 5: return gas_value < 19.5 elif gas_type == 50: return gas_value > 10 else: return False def _save_and_send_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> None: self.alarm_record_center.upload_alarm_record(device_code=device_code, alarm_dict=self.alarm_dict[gas_type], alarm_value=gas_data['gas_value']) def _handle_precision(self, gas_value, gas_dec): """处理气体浓度精度""" if gas_dec == 0: return gas_value elif gas_dec == 1: return gas_value / 10 elif gas_dec == 2: return gas_value / 100 elif gas_dec == 3: return gas_value / 1000 else: return gas_value async def _push_data(self, device_code: str, message: str) -> None: """推送数据到外部系统""" global_config = GlobalConfig() harmful_push_config = global_config.get_harmful_gas_push_config() if harmful_push_config and harmful_push_config.push_url: last_ts = self._push_ts_dict.get(device_code) current_time = datetime.now() # 检查是否需要推送数据 if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: # 将字符串编码为字节类型 encoded_bytes = base64.b64encode(message.encode('utf-8')) # 将字节编码结果转换为字符串 encoded_string = encoded_bytes.decode('utf-8') push_message = {"content": encoded_string} asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message)) self._push_ts_dict[device_code] = current_time # 更新推送时间戳 else: logger.debug('no harmful push config')