import asyncio from datetime import datetime import re import json from common.harmful_gas_manager import HarmfulGasManager from common.http_utils import send_request_async from services.global_config import GlobalConfig HOST = '0.0.0.0' PORT = 12345 harmful_gas_manager = HarmfulGasManager() push_ts_dict = {} gas_units = { 0: "%LEL", 1: "%VOL", 2: "PPM", 3: "umol/mol", 4: "mg/m3", 5: "ug/m3", 6: "℃", 7: "%" } decimals = { 0: "没有小数点", 1: "有一位小数", 2: "有两位小数", 3: "有三位小数" } gas_statuses = { 0: "预热", 1: "正常", 3: "传感器故障", 5: "低限报警", 6: "高限报警" } gas_types = { 3: "硫化氢 (H2S)", 4: "一氧化碳 (CO)", 5: "氧气 (O2)", 50: "可燃气体 (Ex)", } def handle_precision(gas_value, gas_dec): if gas_dec == 0: return gas_value elif gas_dec == 1: return gas_value / 10 elif gas_dec == 2: return gas_value / 100 elif gas_dec == 3: return gas_value / 1000 else: return gas_value # 示例数据解析函数 def parse_sensor_data(device_code, sensor_data): for data in sensor_data: flag = data.get("flag") gas_value = data.get("gas_value") gas_dec = data.get("gas_dec") gas_status = data.get("gas_status") gas_type = data.get("gas_type") gas_unit = data.get("gas_unit") # 获取单位、精度、状态和气体类型的描述 unit = gas_units.get(gas_unit, "未知单位") precision = decimals.get(gas_dec, "未知精度") status = gas_statuses.get(gas_status, "未知状态") gas_type_name = gas_types.get(gas_type, "未知气体") # 格式化气体浓度(根据精度进行转换) gas_value = handle_precision(gas_value, gas_dec) gas_data = { "flag": flag, "gas_value": gas_value, "gas_unit": unit, "gas_status": status, "gas_type": gas_type_name, "precision": precision, 'gas_ts': datetime.now() } harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) print(harmful_gas_manager.get_device_all_data(device_code)) async def data_push(device_code, message): global_config = GlobalConfig() harmful_push_config = global_config.get_gas_push_config() if harmful_push_config and harmful_push_config.push_url: last_ts = push_ts_dict.get(device_code) current_time = datetime.now() # 检查是否需要推送数据 if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: asyncio.create_task(send_request_async(harmful_push_config.push_url, message)) push_ts_dict[device_code] = current_time # 更新推送时间戳 else: print('no harmful push config') async def handle_message(message): message = message.replace('\r\n', '\n').replace('\r', '\n') # 处理消息 harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" match = re.match(harmful_gas_pattern, message, re.DOTALL) if match: device_code = match.group(1) # 设备号 print(f"设备号: {device_code}") sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 sensor_data_json = json.loads(sensor_data_str) if sensor_data_json.get('sensorDatas'): sensor_data = sensor_data_json.get('sensorDatas') parse_sensor_data(device_code, sensor_data) await data_push(device_code, message) else: print("无法解析消息") # 处理客户端连接 async def handle_client(reader, writer): client_address = writer.get_extra_info('peername') print(f"新连接: {client_address}") try: while True: # 接收数据 data = await reader.read(1024) if not data: print(f"连接关闭: {client_address}") break message = data.decode('utf-8') print(f"收到数据({client_address}): {message}") await handle_message(message) except ConnectionResetError: print(f"客户端断开: {client_address}") finally: writer.close() await writer.wait_closed() # 主服务器函数 async def start_server(): server = await asyncio.start_server(handle_client, HOST, PORT) print(f"服务器启动,监听地址: {HOST}:{PORT}") async with server: await server.serve_forever() if __name__ == "__main__": asyncio.run(start_server())