Newer
Older
safe-algo-pro / tcp / tcp_server.py
import asyncio
from datetime import datetime
import re
import json

from common.harmful_gas_manager import HarmfulGasManager
from common.http_utils import send_request_async
from services.global_config import GlobalConfig

HOST = '0.0.0.0'
PORT = 12345

harmful_gas_manager = HarmfulGasManager()
push_ts_dict = {}

gas_units = {
    0: "%LEL",
    1: "%VOL",
    2: "PPM",
    3: "umol/mol",
    4: "mg/m3",
    5: "ug/m3",
    6: "℃",
    7: "%"
}

decimals = {
    0: "没有小数点",
    1: "有一位小数",
    2: "有两位小数",
    3: "有三位小数"
}

gas_statuses = {
    0: "预热",
    1: "正常",
    3: "传感器故障",
    5: "低限报警",
    6: "高限报警"
}

gas_types = {
    3: "硫化氢 (H2S)",
    4: "一氧化碳 (CO)",
    5: "氧气 (O2)",
    50: "可燃气体 (Ex)",
}


def handle_precision(gas_value, gas_dec):
    if gas_dec == 0:
        return gas_value
    elif gas_dec == 1:
        return gas_value / 10
    elif gas_dec == 2:
        return gas_value / 100
    elif gas_dec == 3:
        return gas_value / 1000
    else:
        return gas_value


# 示例数据解析函数
def parse_sensor_data(device_code, sensor_data):
    for data in sensor_data:
        flag = data.get("flag")
        gas_value = data.get("gas_value")
        gas_dec = data.get("gas_dec")
        gas_status = data.get("gas_status")
        gas_type = data.get("gas_type")
        gas_unit = data.get("gas_unit")

        # 获取单位、精度、状态和气体类型的描述
        unit = gas_units.get(gas_unit, "未知单位")
        precision = decimals.get(gas_dec, "未知精度")
        status = gas_statuses.get(gas_status, "未知状态")
        gas_type_name = gas_types.get(gas_type, "未知气体")

        # 格式化气体浓度(根据精度进行转换)
        gas_value = handle_precision(gas_value, gas_dec)

        gas_data = {
            "flag": flag,
            "gas_value": gas_value,
            "gas_unit": unit,
            "gas_status": status,
            "gas_type": gas_type_name,
            "precision": precision,
            'gas_ts': datetime.now()
        }

        harmful_gas_manager.set_device_data(device_code, gas_type, gas_data)
    print(harmful_gas_manager.get_device_all_data(device_code))


async def data_push(device_code, message):
    global_config = GlobalConfig()
    harmful_push_config = global_config.get_gas_push_config()
    if harmful_push_config and harmful_push_config.push_url:
        last_ts = push_ts_dict.get(device_code)
        current_time = datetime.now()

        # 检查是否需要推送数据
        if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval:
            asyncio.create_task(send_request_async(harmful_push_config.push_url, message))
            push_ts_dict[device_code] = current_time  # 更新推送时间戳
    else:
        print('no harmful push config')


async def handle_message(message):
    message = message.replace('\r\n', '\n').replace('\r', '\n')

    # 处理消息
    harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$"
    match = re.match(harmful_gas_pattern, message, re.DOTALL)
    if match:
        device_code = match.group(1)  # 设备号
        print(f"设备号: {device_code}")
        sensor_data_str = "{" + match.group(2) + "}"  # JSON数组部分
        sensor_data_json = json.loads(sensor_data_str)

        if sensor_data_json.get('sensorDatas'):
            sensor_data = sensor_data_json.get('sensorDatas')
            parse_sensor_data(device_code, sensor_data)

            await data_push(device_code, message)
    else:
        print("无法解析消息")


# 处理客户端连接
async def handle_client(reader, writer):
    client_address = writer.get_extra_info('peername')
    print(f"新连接: {client_address}")

    try:
        while True:
            # 接收数据
            data = await reader.read(1024)
            if not data:
                print(f"连接关闭: {client_address}")
                break

            message = data.decode('utf-8')
            print(f"收到数据({client_address}): {message}")

            await handle_message(message)

    except ConnectionResetError:
        print(f"客户端断开: {client_address}")
    finally:
        writer.close()
        await writer.wait_closed()


# 主服务器函数
async def start_server():
    server = await asyncio.start_server(handle_client, HOST, PORT)
    print(f"服务器启动,监听地址: {HOST}:{PORT}")

    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(start_server())