Newer
Older
safe-algo-pro / tcp / gas_device_handler.py
import copy
import json
import asyncio
from datetime import datetime

from common.byte_utils import format_bytes
from common.global_logger import logger
from common.http_utils import send_request_async
from db.database import get_db
from entity.data_gas import DataGas
from scene_handler.alarm_record_center import AlarmRecordCenter
from services.data_gas_service import DataGasService
from services.global_config import GlobalConfig


def parse_gas_data(data):
    # 数据长度检查,确保最小长度符合协议要求
    if len(data) < 13:
        return None
        # raise ValueError("数据长度不足,无法解析")

    # 检查帧头(AA 01)
    if data[6:8] != b'\xAA\x01':
        return None
        # raise ValueError("帧头不匹配")


    # 解析设备编号(UU VV WW XX YY ZZ)
    device_id = ''.join(f'{byte:02X}' for byte in data[:6])



    # 解析 GG, HH, II 字节,计算燃气浓度值 (ppm.m)
    GG = data[8]
    HH = data[9]
    II = data[10]
    gas_concentration = GG * 65536 + HH * 256 + II
    if gas_concentration > 99999:
        raise ValueError("燃气浓度值超出范围")

    # 解析激光光强等级 JJ
    JJ = data[11]
    if not (0 <= JJ <= 14):
        raise ValueError("激光光强等级超出范围")

    # 校验和 SU 验证,从第8到第12字节累加
    SU_received = data[12]
    SU_calculated = sum(data[7:12]) & 0xFF  # 取累加值的低8位
    if SU_received != SU_calculated:
        raise ValueError(f"校验和不匹配: 预期 {SU_calculated:02X}, 实际 {SU_received:02X}")

    # 返回解析的结果
    return {
        "device_code": device_id,
        "gas_value": int(gas_concentration),
        "laser_intensity_level": JJ,
        "checksum_valid": SU_received == SU_calculated
    }


# 业务处理层:解析气体数据、保存数据库、推送数据
class GasDataHandler:

    alarm_dict = {
        'alarmType': '1',
        'alarmContent': '甲烷浓度超限',
    }

    def __init__(self, main_loop=None):
        self.last_push = {}
        self.alarm_record_center = AlarmRecordCenter(main_loop=main_loop)

    async def handle_data(self, data: bytes):
        try:
            res = parse_gas_data(data)
            if res:
                logger.info(f"解析甲烷数据:{format_bytes(data)} {res}")
                async for db in get_db():
                    data_gas_service = DataGasService(db)
                    data_gas = DataGas(
                        device_code=res['device_code'],
                        gas_value=res['gas_value']
                    )
                    await data_gas_service.add_data_gas(data_gas)
                    self.push_data(data_gas)
                    self.handle_alarm(data_gas)
        except Exception as e:
            logger.exception(f"处理甲烷数据出错: {e}")

    def push_data(self, data_gas):
        global_config = GlobalConfig()
        gas_push_config = global_config.get_gas_push_config()
        if gas_push_config and gas_push_config.push_url and gas_push_config.push_interval > 0:
            last_ts = self.last_push.get(data_gas.device_code)
            current_time = datetime.now()

            # 检查是否需要推送数据
            if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval:
                send_data = json.loads(copy.deepcopy(data_gas.json()))
                send_data.pop("id")
                asyncio.create_task(send_request_async(gas_push_config.push_url, send_data))
                self.last_push[data_gas.device_code] = current_time  # 更新推送时间戳

    def handle_alarm(self, data_gas):
        if data_gas.gas_value > 100:
            logger.info(f"甲烷浓度超限报警: {data_gas}")
            self.alarm_record_center.upload_alarm_record(device_code=data_gas.device_code,
                                                         alarm_dict=self.alarm_dict,
                                                         alarm_value=data_gas.gas_value)
        else:
            logger.info(f"甲烷浓度正常: {data_gas}")