import asyncio from collections import deque from datetime import datetime from common.byte_utils import format_bytes from common.consts import TREE_COMMAND from common.global_logger import logger from common.http_utils import send_request_async from db.database import get_db from entity.data_gas import DataGas from services.data_gas_service import DataGasService from services.global_config import GlobalConfig def parse_gas_data(data): # 数据长度检查,确保最小长度符合协议要求 if len(data) < 13: return None # raise ValueError("数据长度不足,无法解析") # 检查帧头(AA 01) if data[6:8] != b'\xAA\x01': return None # raise ValueError("帧头不匹配") # 解析设备编号(UU VV WW XX YY ZZ) device_id = ''.join(f'{byte:02X}' for byte in data[:6]) # 解析 GG, HH, II 字节,计算燃气浓度值 (ppm.m) GG = data[8] HH = data[9] II = data[10] gas_concentration = GG * 65536 + HH * 256 + II if gas_concentration > 99999: raise ValueError("燃气浓度值超出范围") # 解析激光光强等级 JJ JJ = data[11] if not (0 <= JJ <= 14): raise ValueError("激光光强等级超出范围") # 校验和 SU 验证,从第8到第12字节累加 SU_received = data[12] SU_calculated = sum(data[7:12]) & 0xFF # 取累加值的低8位 if SU_received != SU_calculated: raise ValueError(f"校验和不匹配: 预期 {SU_calculated:02X}, 实际 {SU_received:02X}") # 返回解析的结果 return { "device_code": device_id, "gas_value": int(gas_concentration), "laser_intensity_level": JJ, "checksum_valid": SU_received == SU_calculated } class TcpClientConnector: def __init__(self, ip, port, query_interval=3, reconnect_interval=5, timeout=5): self.ip = ip self.port = port self.reader = None self.writer = None self.query_interval = query_interval # 甲烷查询指令间隔 self.reconnect_interval = reconnect_interval # 重连间隔 self.timeout = timeout # 连接/发送超时时间 self.is_connected = False # 连接状态标志 self.is_reconnecting = False self.message_queue = asyncio.Queue() #deque() self.gas_task = None self.read_lock = asyncio.Lock() # 添加锁 self.push_ts_dict = {} async def connect(self): """连接到设备""" while not self.is_connected: try: logger.info(f"正在连接到 {self.ip}:{self.port}...") # 设置连接超时 self.reader, self.writer = await asyncio.wait_for( asyncio.open_connection(self.ip, self.port), timeout=self.timeout ) self.is_connected = True logger.info(f"已连接到 {self.ip}:{self.port}") if self.gas_task is None: self.gas_task = asyncio.create_task(self.process_message_queue()) # Start processing message queue # 一旦连接成功,开始发送查询指令 await self.start_gas_query() except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e: logger.error(f"连接到 {self.ip}:{self.port} 失败,错误: {e}") logger.info(f"{self.reconnect_interval} 秒后将重连到 {self.ip}:{self.port}") await asyncio.sleep(self.reconnect_interval) async def reconnect(self): """处理断线重连""" if self.is_reconnecting: logger.info("Reconnection is already in progress...") return self.is_reconnecting = True await self.disconnect() # 先断开现有连接 logger.info(f"Reconnecting to {self.ip}:{self.port} after {self.reconnect_interval} seconds") # await asyncio.sleep(self.reconnect_interval) # 等待n秒后重连 await self.connect() self.is_reconnecting = False async def disconnect(self): """断开设备连接,清理资源""" if self.writer: logger.info(f"Disconnecting from {self.ip}:{self.port}...") try: self.writer.close() await self.writer.wait_closed() except Exception as e: logger.error(f"Error while disconnecting: {e}") finally: self.reader = None self.writer = None self.is_connected = False # 设置连接状态为 False logger.info(f"Disconnected from {self.ip}:{self.port}") async def start_gas_query(self): """启动甲烷查询指令,每n秒发送一次""" try: logger.info(f"Start querying gas from {self.ip}...") while self.is_connected: await self.send_message(TREE_COMMAND.GAS_QUERY, have_response=True) await asyncio.sleep(self.query_interval) except (ConnectionResetError, asyncio.IncompleteReadError) as e: logger.error(f"Error during query for {self.ip}:{self.port}: {e}") await self.reconnect() async def parse_response(self, data): """解析设备返回的数据""" logger.info(f"Received data from {self.ip}:{self.port}: {format_bytes(data)}") try: res = parse_gas_data(data) if res: logger.info(res) async for db in get_db(): data_gas_service = DataGasService(db) data_gas = DataGas( device_code=res['device_code'], gas_value=res['gas_value'] ) await data_gas_service.add_data_gas(data_gas) await self.gas_push(data_gas) except Exception as e: logger.error(f"Parse and save gas data failed: {e}") async def gas_push(self, data_gas): global_config = GlobalConfig() gas_push_config = global_config.get_gas_push_config() if gas_push_config and gas_push_config.push_url: last_ts = self.push_ts_dict.get(data_gas.device_code) current_time = datetime.now() # 检查是否需要推送数据 if last_ts is None or (current_time - last_ts).total_seconds() > gas_push_config.push_interval: asyncio.create_task(send_request_async(gas_push_config.push_url, data_gas.json())) self.push_ts_dict[data_gas.device_code] = current_time # 更新推送时间戳 async def send_message(self, message: bytes, have_response=True): """Add a message to the queue for sending""" self.message_queue.append((message, have_response)) logger.info(f"Message enqueued for {self.ip}:{self.port} {format_bytes(message)}") async def process_message_queue(self): """Process messages in the queue, retrying on failures""" while self.is_connected: if self.message_queue: message, have_response = self.message_queue.popleft() await self._send_message_with_retry(message, have_response) else: await asyncio.sleep(1) # Small delay to prevent busy-waiting async def _send_message_with_retry(self, message: bytes, have_response): """Send a message with retries on failure""" retry_attempts = 3 # Maximum retry attempts for _ in range(retry_attempts): if not self.is_connected: await self.reconnect() if not self.is_connected: logger.error("Reconnection failed") continue # Skip this attempt if reconnection fails try: if self.writer is None or self.writer.is_closing(): raise ConnectionResetError("No active connection or writer is closing") self.writer.write(message) await self.writer.drain() logger.info(f"Sent message to {self.ip}:{self.port}: {message}") if have_response: async with self.read_lock: # Ensure only one coroutine reads data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout) await self.parse_response(data) return # Exit loop on success except (asyncio.TimeoutError, ConnectionResetError, asyncio.IncompleteReadError, RuntimeError, BrokenPipeError, OSError, EOFError, ConnectionAbortedError, ConnectionRefusedError) as e: logger.exception("Failed to send message") self.is_connected = False # Mark connection as disconnected await self.reconnect() logger.error("Max retry attempts reached, message sending failed") # async def send_message(self, message: bytes, have_response=True): # """发送自定义消息的接口,供其他类调用""" # try: # # 检查连接状态 # if self.writer is None: # raise ConnectionResetError("No active connection") # # # 发送自定义消息 # self.writer.write(message) # await self.writer.drain() # 确保数据已发送 # logger.info(f"Sent message to {self.ip}:{self.port}: {message}") # # # 可以根据需求选择是否接收响应 # if have_response: # data = await asyncio.wait_for(self.reader.read(1024), timeout=self.timeout) # # if not data: # # raise ConnectionResetError("Connection lost or no data received") # await self.parse_response(data) # return data # 返回响应数据 # else: # return None # except asyncio.TimeoutError: # logger.error(f"TimeoutError: No response from {self.ip}:{self.port} after {self.timeout} seconds") # await self.reconnect() # 如果超时则重新连接 # except (ConnectionResetError, asyncio.IncompleteReadError) as e: # logger.error(f"Failed to send message: {e}") # await self.reconnect() # 重新连接设备 if __name__ == '__main__': # client = TcpClientConnector(ip="127.0.0.1", port=12345) # asyncio.run(client.connect()) # Run the asynchronous connect method # 示例数据 data = b'\x07\x00\x01\x00\x01\xaa\x01\x00"0\r`' result = parse_gas_data(data) print(result)