Newer
Older
safe-algo-pro / tcp / tcp_client_connector.py
import asyncio
from collections import deque
from datetime import datetime

from common.byte_utils import format_bytes
from common.consts import TREE_COMMAND
from common.global_logger import logger
from common.http_utils import send_request_async
from db.database import get_db
from entity.data_gas import DataGas
from services.data_gas_service import DataGasService
from services.global_config import GlobalConfig


def parse_gas_data(data):
    # 数据长度检查,确保最小长度符合协议要求
    if len(data) < 13:
        return None
        # raise ValueError("数据长度不足,无法解析")

    # 检查帧头(AA 01)
    if data[6:8] != b'\xAA\x01':
        return None
        # raise ValueError("帧头不匹配")


    # 解析设备编号(UU VV WW XX YY ZZ)
    device_id = ''.join(f'{byte:02X}' for byte in data[:6])



    # 解析 GG, HH, II 字节,计算燃气浓度值 (ppm.m)
    GG = data[8]
    HH = data[9]
    II = data[10]
    gas_concentration = GG * 65536 + HH * 256 + II
    if gas_concentration > 99999:
        raise ValueError("燃气浓度值超出范围")

    # 解析激光光强等级 JJ
    JJ = data[11]
    if not (0 <= JJ <= 14):
        raise ValueError("激光光强等级超出范围")

    # 校验和 SU 验证,从第8到第12字节累加
    SU_received = data[12]
    SU_calculated = sum(data[7:12]) & 0xFF  # 取累加值的低8位
    if SU_received != SU_calculated:
        raise ValueError(f"校验和不匹配: 预期 {SU_calculated:02X}, 实际 {SU_received:02X}")

    # 返回解析的结果
    return {
        "device_code": device_id,
        "gas_value": int(gas_concentration),
        "laser_intensity_level": JJ,
        "checksum_valid": SU_received == SU_calculated
    }


class TcpClientConnector:
    def __init__(self, ip, port, query_interval=3, reconnect_interval=5, timeout=5):
        self.ip = ip
        self.port = port
        self.reader = None
        self.writer = None
        self.query_interval = query_interval  # 甲烷查询指令间隔
        self.reconnect_interval = reconnect_interval  # 重连间隔
        self.timeout = timeout  # 连接/发送超时时间
        self.is_connected = False  # 连接状态标志
        self.is_reconnecting = False
        self.message_queue = asyncio.Queue()  #deque()
        self.gas_task = None
        self.read_lock = asyncio.Lock()  # 添加锁
        self.push_ts_dict = {}

    async def connect(self):
        """连接到设备"""
        while not self.is_connected:
            try:
                logger.info(f"正在连接到 {self.ip}:{self.port}...")
                # 设置连接超时
                self.reader, self.writer = await asyncio.wait_for(
                    asyncio.open_connection(self.ip, self.port), timeout=self.timeout
                )
                self.is_connected = True
                logger.info(f"已连接到 {self.ip}:{self.port}")

                if self.gas_task is None:
                    self.gas_task = asyncio.create_task(self.process_message_queue())  # Start processing message queue

                # 一旦连接成功,开始发送查询指令
                await self.start_gas_query()
            except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e:
                logger.error(f"连接到 {self.ip}:{self.port} 失败,错误: {e}")
                logger.info(f"{self.reconnect_interval} 秒后将重连到 {self.ip}:{self.port}")
                await asyncio.sleep(self.reconnect_interval)

    async def reconnect(self):
        """处理断线重连"""
        if self.is_reconnecting:
            logger.info("Reconnection is already in progress...")
            return
        self.is_reconnecting = True
        await self.disconnect()  # 先断开现有连接
        logger.info(f"Reconnecting to {self.ip}:{self.port} after {self.reconnect_interval} seconds")
        # await asyncio.sleep(self.reconnect_interval)  # 等待n秒后重连
        await self.connect()
        self.is_reconnecting = False

    async def disconnect(self):
        """断开设备连接,清理资源"""
        if self.writer:
            logger.info(f"Disconnecting from {self.ip}:{self.port}...")
            try:
                self.writer.close()
                await self.writer.wait_closed()
            except Exception as e:
                logger.error(f"Error while disconnecting: {e}")
            finally:
                self.reader = None
                self.writer = None
                self.is_connected = False  # 设置连接状态为 False
                logger.info(f"Disconnected from {self.ip}:{self.port}")

    async def start_gas_query(self):
        """启动甲烷查询指令,每n秒发送一次"""
        try:
            logger.info(f"Start querying gas from {self.ip}...")
            while self.is_connected:
                await self.send_message(TREE_COMMAND.GAS_QUERY, have_response=True)
                await asyncio.sleep(self.query_interval)
        except (ConnectionResetError, asyncio.IncompleteReadError) as e:
            logger.error(f"Error during query for {self.ip}:{self.port}: {e}")
            await self.reconnect()

    async def parse_response(self, data):
        """解析设备返回的数据"""
        logger.info(f"Received data from {self.ip}:{self.port}: {format_bytes(data)}")
        try:
            res = parse_gas_data(data)
            if res:
                logger.info(res)
                async for db in get_db():
                    data_gas_service = DataGasService(db)
                    data_gas = DataGas(
                        device_code=res['device_code'],
                        gas_value=res['gas_value']
                    )

                    await data_gas_service.add_data_gas(data_gas)
                    await self.gas_push(data_gas)

        except Exception as e:
            logger.error(f"Parse and save gas data failed: {e}")

    async def gas_push(self, data_gas):
        global_config = GlobalConfig()
        gas_push_config = global_config.get_gas_push_config()
        if gas_push_config and gas_push_config.push_url:
            last_ts = self.push_ts_dict.get(data_gas.device_code)
            current_time = datetime.now()

            # 检查是否需要推送数据
            if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval:
                asyncio.create_task(send_request_async(gas_push_config.push_url, data_gas.json()))
                self.push_ts_dict[data_gas.device_code] = current_time  # 更新推送时间戳

    async def send_message(self, message: bytes, have_response=True):
        """Add a message to the queue for sending"""
        self.message_queue.append((message, have_response))
        logger.info(f"Message enqueued for {self.ip}:{self.port} {format_bytes(message)}")

    async def process_message_queue(self):
        """Process messages in the queue, retrying on failures"""
        while self.is_connected:
            if self.message_queue:
                message, have_response = self.message_queue.popleft()
                await self._send_message_with_retry(message, have_response)
            else:
                await asyncio.sleep(1)  # Small delay to prevent busy-waiting

    async def _send_message_with_retry(self, message: bytes, have_response):
        """Send a message with retries on failure"""
        retry_attempts = 3  # Maximum retry attempts
        for _ in range(retry_attempts):
            if not self.is_connected:
                await self.reconnect()
                if not self.is_connected:
                    logger.error("Reconnection failed")
                    continue  # Skip this attempt if reconnection fails

            try:
                if self.writer is None or self.writer.is_closing():
                    raise ConnectionResetError("No active connection or writer is closing")

                self.writer.write(message)
                await self.writer.drain()
                logger.info(f"Sent message to {self.ip}:{self.port}: {message}")

                if have_response:
                    async with self.read_lock:  # Ensure only one coroutine reads
                        data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout)
                        await self.parse_response(data)
                return  # Exit loop on success

            except (asyncio.TimeoutError, ConnectionResetError, asyncio.IncompleteReadError, RuntimeError,
                    BrokenPipeError, OSError, EOFError, ConnectionAbortedError, ConnectionRefusedError) as e:
                logger.exception("Failed to send message")
                self.is_connected = False  # Mark connection as disconnected
                await self.reconnect()

        logger.error("Max retry attempts reached, message sending failed")

    # async def send_message(self, message: bytes, have_response=True):
    #     """发送自定义消息的接口,供其他类调用"""
    #     try:
    #         # 检查连接状态
    #         if self.writer is None:
    #             raise ConnectionResetError("No active connection")
    #
    #         # 发送自定义消息
    #         self.writer.write(message)
    #         await self.writer.drain()  # 确保数据已发送
    #         logger.info(f"Sent message to {self.ip}:{self.port}: {message}")
    #
    #         # 可以根据需求选择是否接收响应
    #         if have_response:
    #             data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout)
    #             # if not data:
    #             #     raise ConnectionResetError("Connection lost or no data received")
    #             await self.parse_response(data)
    #             return data  # 返回响应数据
    #         else:
    #             return None
    #     except asyncio.TimeoutError:
    #         logger.error(f"TimeoutError: No response from {self.ip}:{self.port} after {self.timeout} seconds")
    #         await self.reconnect()  # 如果超时则重新连接
    #     except (ConnectionResetError, asyncio.IncompleteReadError) as e:
    #         logger.error(f"Failed to send message: {e}")
    #         await self.reconnect()  # 重新连接设备


if __name__ == '__main__':
    # client = TcpClientConnector(ip="127.0.0.1", port=12345)
    # asyncio.run(client.connect())  # Run the asynchronous connect method

    # 示例数据
    data = b'\x07\x00\x01\x00\x01\xaa\x01\x00"0\r`'
    result = parse_gas_data(data)
    print(result)