Newer
Older
safe-algo-pro / tcp / harmful_device_handler.py
import asyncio
import struct
import traceback
import json
import base64
import re
from datetime import datetime
from typing import Dict

from common.harmful_gas_manager import HarmfulGasManager
from common.http_utils import send_request_async
from common.global_logger import logger
from scene_handler.alarm_record_center import AlarmRecordCenter
from services.global_config import GlobalConfig


# 有害气体数据解析器
class HarmfulGasHandler:
    """有害气体设备数据解析器"""

    gas_units = {
        0: "%LEL",
        1: "%VOL",
        2: "PPM",
        3: "umol/mol",
        4: "mg/m3",
        5: "ug/m3",
        6: "℃",
        7: "%"
    }

    decimals = {
        0: "没有小数点",
        1: "有一位小数",
        2: "有两位小数",
        3: "有三位小数"
    }

    gas_statuses = {
        0: "预热",
        1: "正常",
        3: "传感器故障",
        5: "低限报警",
        6: "高限报警"
    }

    gas_types = {
        3: "硫化氢 (H2S)",
        4: "一氧化碳 (CO)",
        5: "氧气 (O2)",
        50: "可燃气体 (Ex)",
    }

    alarm_dict = {
        3: {
            'alarmType': '5',
            'alarmContent': '硫化氢浓度过高',
        },
        4: {
            'alarmType': '7',
            'alarmContent': '一氧化碳浓度过高',
        },
        5: {
            'alarmType': '4',
            'alarmContent': '氧气浓度过低',
        },
        50: {
            'alarmType': '8',
            'alarmContent': '可燃气体浓度过高',
        }
    }

    def __init__(self, main_loop=None):
        self._harmful_gas_manager = HarmfulGasManager()
        self._push_ts_dict = {}
        self.alarm_record_center = AlarmRecordCenter(main_loop=main_loop)

    async def parse(self, message) -> Dict:

        message = message.replace('\r\n', '\n').replace('\r', '\n')

        """解析有害气体设备数据"""
        harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$"
        match = re.match(harmful_gas_pattern, message, re.DOTALL)
        if not match:
            print('有害气体浓度解析异常')
            return {}

        device_code = match.group(1)  # 设备号
        sensor_data_str = "{" + match.group(2) + "}"  # JSON数组部分

        try:
            sensor_data_json = json.loads(sensor_data_str)
            sensor_data = sensor_data_json.get('sensorDatas', [])

            if sensor_data:
                await self._push_data(device_code, message)

            for data_item in sensor_data:
                gas_type, gas_data = self._parse_sensor_item(device_code, data_item)
                # 判断异常并上报
                if gas_type is not None and gas_data is not None:
                    if self._is_data_alarm(device_code, gas_type, gas_data):
                        print(f"四合一浓度报警: {device_code}, {gas_type}, {gas_data}")
                        self._save_and_send_alarm(device_code, gas_type, gas_data)

            print(f'更新四合一{device_code}数据 {self._harmful_gas_manager.get_device_all_data(device_code)}')

        except json.JSONDecodeError:
            logger.error(f"JSON解析错误: {message}")
            return {}
        except Exception as e:
            logger.error(f"解析有害气体数据时出错: {e}")
            logger.error(traceback.format_exc())
            return {}

    def _parse_sensor_item(self, device_code: str, data: Dict):
        """解析单个传感器数据项"""
        try:
            flag = data.get("flag")
            gas_value = data.get("gas_value")
            gas_dec = data.get("gas_dec")
            gas_status = data.get("gas_status")
            gas_type = data.get("gas_type")
            gas_unit = data.get("gas_unit")

            # 获取单位、精度、状态和气体类型的描述
            unit = self.gas_units.get(gas_unit, "未知单位")
            precision = self.decimals.get(gas_dec, "未知精度")
            status = self.gas_statuses.get(gas_status, "未知状态")
            gas_type_name = self.gas_types.get(gas_type, "未知气体")

            # 格式化气体浓度(根据精度进行转换)
            gas_value = self._handle_precision(gas_value, gas_dec)

            gas_data = {
                "flag": flag,
                "gas_value": gas_value,
                "gas_unit": unit,
                "gas_status": status,
                "gas_type": gas_type_name,
                "gas_type_code": gas_type,
                "precision": precision,
                'gas_ts': datetime.now()
            }

            self._harmful_gas_manager.set_device_data(device_code, gas_type, gas_data)
            return gas_type, gas_data
        except Exception as e:
            logger.error(f"解析传感器数据项时出错: {e}")
            return None

    def _is_data_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> bool:
        """处理报警"""
        gas_value = gas_data['gas_value']
        if gas_type == 3:
            return gas_value > 10
        elif gas_type == 4:
            return gas_value > 30
        elif gas_type == 5:
            return gas_value < 19.5
        elif gas_type == 50:
            return gas_value > 10
        else:
            return False

    def _save_and_send_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> None:
        self.alarm_record_center.upload_alarm_record(device_code=device_code,
                                                     alarm_dict=self.alarm_dict[gas_type],
                                                     alarm_value=gas_data['gas_value'])

    def _handle_precision(self, gas_value, gas_dec):
        """处理气体浓度精度"""
        if gas_dec == 0:
            return gas_value
        elif gas_dec == 1:
            return gas_value / 10
        elif gas_dec == 2:
            return gas_value / 100
        elif gas_dec == 3:
            return gas_value / 1000
        else:
            return gas_value

    async def _push_data(self, device_code: str, message: str) -> None:
        """推送数据到外部系统"""
        global_config = GlobalConfig()
        harmful_push_config = global_config.get_harmful_gas_push_config()

        if harmful_push_config and harmful_push_config.push_url:
            last_ts = self._push_ts_dict.get(device_code)
            current_time = datetime.now()

            # 检查是否需要推送数据
            if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval:
                # 将字符串编码为字节类型
                encoded_bytes = base64.b64encode(message.encode('utf-8'))
                # 将字节编码结果转换为字符串
                encoded_string = encoded_bytes.decode('utf-8')
                push_message = {"content": encoded_string}
                asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message))
                self._push_ts_dict[device_code] = current_time  # 更新推送时间戳
        else:
            logger.debug('no harmful push config')