Newer
Older
safe-algo-pro / tcp / tcp_client_connector.py
import asyncio
from datetime import datetime

from common.byte_utils import format_bytes
from common.consts import TREE_COMMAND
from common.global_logger import logger
from common.http_utils import send_request_async
from db.database import get_db
from entity.data_gas import DataGas
from services.data_gas_service import DataGasService
from services.global_config import GlobalConfig


def parse_gas_data(data):
    # 数据长度检查,确保最小长度符合协议要求
    if len(data) < 13:
        raise ValueError("数据长度不足,无法解析")

    # 解析设备编号(UU VV WW XX YY ZZ)
    device_id = ''.join(f'{byte:02X}' for byte in data[:6])

    # 检查帧头(AA 01)
    if data[6:8] != b'\xAA\x01':
        raise ValueError("帧头不匹配")

    # 解析 GG, HH, II 字节,计算燃气浓度值 (ppm.m)
    GG = data[8]
    HH = data[9]
    II = data[10]
    gas_concentration = GG * 65536 + HH * 256 + II
    if gas_concentration > 99999:
        raise ValueError("燃气浓度值超出范围")

    # 解析激光光强等级 JJ
    JJ = data[11]
    if not (0 <= JJ <= 14):
        raise ValueError("激光光强等级超出范围")

    # 校验和 SU 验证,从第8到第12字节累加
    SU_received = data[12]
    SU_calculated = sum(data[7:12]) & 0xFF  # 取累加值的低8位
    if SU_received != SU_calculated:
        raise ValueError(f"校验和不匹配: 预期 {SU_calculated:02X}, 实际 {SU_received:02X}")

    # 返回解析的结果
    return {
        "device_code": device_id,
        "gas_value": int(gas_concentration),
        "laser_intensity_level": JJ,
        "checksum_valid": SU_received == SU_calculated
    }


class TcpClientConnector:
    def __init__(self, ip, port, query_interval=3, reconnect_interval=5, timeout=5):
        self.ip = ip
        self.port = port
        self.reader = None
        self.writer = None
        self.query_interval = query_interval  # 甲烷查询指令间隔
        self.reconnect_interval = reconnect_interval  # 重连间隔
        self.timeout = timeout  # 连接/发送超时时间
        self.is_connected = False  # 连接状态标志

        self.push_ts_dict = {}

    async def connect(self):
        """连接到设备"""
        try:
            logger.info(f"Connecting to {self.ip}:{self.port}...")
            # 使用 asyncio.wait_for() 为连接设置超时时间
            self.reader, self.writer = await asyncio.wait_for(
                asyncio.open_connection(self.ip, self.port), timeout=self.timeout
            )
            self.is_connected = True
            logger.info(f"Connected to {self.ip}:{self.port}")
            # 一旦连接成功,开始发送查询指令
            await self.start_gas_query()
        except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e:
            logger.error(f"Failed to connect to {self.ip}:{self.port}, error: {e}")
            await self.reconnect()

    async def reconnect(self):
        """处理断线重连"""
        await self.disconnect()  # 先断开现有连接
        logger.info(f"Reconnecting to {self.ip}:{self.port} after {self.reconnect_interval} seconds")
        await asyncio.sleep(self.reconnect_interval)  # 等待n秒后重连
        await self.connect()

    async def disconnect(self):
        """断开设备连接,清理资源"""
        if self.writer:
            logger.info(f"Disconnecting from {self.ip}:{self.port}...")
            try:
                self.writer.close()
                await self.writer.wait_closed()
            except Exception as e:
                logger.error(f"Error while disconnecting: {e}")
            finally:
                self.reader = None
                self.writer = None
                self.is_connected = False  # 设置连接状态为 False
                logger.info(f"Disconnected from {self.ip}:{self.port}")

    async def start_gas_query(self):
        """启动甲烷查询指令,每n秒发送一次"""
        try:
            logger.info(f"Start querying gas from {self.ip}...")
            while self.is_connected:
                await self.send_message(TREE_COMMAND.GAS_QUERY, have_response=True)
                await asyncio.sleep(self.query_interval)
        except (ConnectionResetError, asyncio.IncompleteReadError) as e:
            logger.error(f"Error during query for {self.ip}:{self.port}: {e}")
            await self.reconnect()

    async def parse_response(self, data):
        """解析设备返回的数据"""
        logger.info(f"Received data from {self.ip}:{self.port}: {format_bytes(data)}")
        try:
            res = parse_gas_data(data)
            logger.info(res)
            async for db in get_db():
                data_gas_service = DataGasService(db)
                data_gas = DataGas(
                    device_code=res['device_code'],
                    gas_value=res['gas_value']
                )

                await data_gas_service.add_data_gas(data_gas)
                await self.gas_push(data_gas)

        except Exception as e:
            logger.error(f"Parse and save gas data failed: {e}")

    async def gas_push(self, data_gas):
        global_config = GlobalConfig()
        gas_push_config = global_config.get_gas_push_config()
        if gas_push_config and gas_push_config.push_url:
            last_ts = self.push_ts_dict.get(data_gas.device_code)
            current_time = datetime.now()

            # 检查是否需要推送数据
            if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval:
                asyncio.create_task(send_request_async(gas_push_config.push_url, data_gas.json()))
                self.push_ts_dict[data_gas.device_code] = current_time  # 更新推送时间戳

    async def send_message(self, message: bytes, have_response=True):
        """发送自定义消息的接口,供其他类调用"""
        try:
            # 检查连接状态
            if self.writer is None:
                raise ConnectionResetError("No active connection")

            # 发送自定义消息
            self.writer.write(message)
            await self.writer.drain()  # 确保数据已发送
            logger.info(f"Sent message to {self.ip}:{self.port}: {message}")

            # 可以根据需求选择是否接收响应
            if have_response:
                data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout)
                # if not data:
                #     raise ConnectionResetError("Connection lost or no data received")
                await self.parse_response(data)
                return data  # 返回响应数据
            else:
                return None
        except asyncio.TimeoutError:
            logger.error(f"TimeoutError: No response from {self.ip}:{self.port} after {self.timeout} seconds")
            await self.reconnect()  # 如果超时则重新连接
        except (ConnectionResetError, asyncio.IncompleteReadError) as e:
            logger.error(f"Failed to send message: {e}")
            await self.reconnect()  # 重新连接设备


if __name__ == '__main__':
    # client = TcpClientConnector(ip="127.0.0.1", port=12345)
    # asyncio.run(client.connect())  # Run the asynchronous connect method

    # 示例数据
    data = b'\x07\x00\x01\x00\x01\xaa\x01\x00"0\r`'
    result = parse_gas_data(data)
    print(result)