import copy import json import asyncio from datetime import datetime from common.byte_utils import format_bytes from common.global_logger import logger from common.http_utils import send_request_async from db.database import get_db from entity.data_gas import DataGas from scene_handler.alarm_record_center import AlarmRecordCenter from services.data_gas_service import DataGasService from services.global_config import GlobalConfig def parse_gas_data(data): # 数据长度检查,确保最小长度符合协议要求 if len(data) < 13: return None # raise ValueError("数据长度不足,无法解析") # 检查帧头(AA 01) if data[6:8] != b'\xAA\x01': return None # raise ValueError("帧头不匹配") # 解析设备编号(UU VV WW XX YY ZZ) device_id = ''.join(f'{byte:02X}' for byte in data[:6]) # 解析 GG, HH, II 字节,计算燃气浓度值 (ppm.m) GG = data[8] HH = data[9] II = data[10] gas_concentration = GG * 65536 + HH * 256 + II if gas_concentration > 99999: raise ValueError("燃气浓度值超出范围") # 解析激光光强等级 JJ JJ = data[11] if not (0 <= JJ <= 14): raise ValueError("激光光强等级超出范围") # 校验和 SU 验证,从第8到第12字节累加 SU_received = data[12] SU_calculated = sum(data[7:12]) & 0xFF # 取累加值的低8位 if SU_received != SU_calculated: raise ValueError(f"校验和不匹配: 预期 {SU_calculated:02X}, 实际 {SU_received:02X}") # 返回解析的结果 return { "device_code": device_id, "gas_value": int(gas_concentration), "laser_intensity_level": JJ, "checksum_valid": SU_received == SU_calculated } # 业务处理层:解析气体数据、保存数据库、推送数据 class GasDataHandler: alarm_dict = { 'alarmType': '1', 'alarmContent': '甲烷浓度超限', } def __init__(self, main_loop=None): self.last_push = {} self.alarm_record_center = AlarmRecordCenter(main_loop=main_loop) async def handle_data(self, data: bytes): try: res = parse_gas_data(data) if res: logger.info(f"解析甲烷数据:{format_bytes(data)} {res}") async for db in get_db(): data_gas_service = DataGasService(db) data_gas = DataGas( device_code=res['device_code'], gas_value=res['gas_value'] ) await data_gas_service.add_data_gas(data_gas) self.push_data(data_gas) self.handle_alarm(data_gas) except Exception as e: logger.exception(f"处理甲烷数据出错: {e}") def push_data(self, data_gas): global_config = GlobalConfig() gas_push_config = global_config.get_gas_push_config() if gas_push_config and gas_push_config.push_url and gas_push_config.push_interval > 0: last_ts = self.last_push.get(data_gas.device_code) current_time = datetime.now() # 检查是否需要推送数据 if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval: send_data = json.loads(copy.deepcopy(data_gas.json())) send_data.pop("id") asyncio.create_task(send_request_async(gas_push_config.push_url, send_data)) self.last_push[data_gas.device_code] = current_time # 更新推送时间戳 def handle_alarm(self, data_gas): if data_gas.gas_value > 100: logger.info(f"甲烷浓度超限报警: {data_gas}") self.alarm_record_center.upload_alarm_record(device_code=data_gas.device_code, alarm_dict=self.alarm_dict, alarm_value=data_gas.gas_value) else: logger.info(f"甲烷浓度正常: {data_gas}")