diff --git a/tcp/tcp_server.py b/tcp/tcp_server.py index 57011b5..e99bf0a 100644 --- a/tcp/tcp_server.py +++ b/tcp/tcp_server.py @@ -1,182 +1,107 @@ +# TCP服务器类 import asyncio -import base64 -from datetime import datetime -import re -import json - -from common.harmful_gas_manager import HarmfulGasManager -from common.http_utils import send_request_async -from services.global_config import GlobalConfig - -HOST = '0.0.0.0' -PORT = 9001 - -harmful_gas_manager = HarmfulGasManager() -push_ts_dict = {} - -gas_units = { - 0: "%LEL", - 1: "%VOL", - 2: "PPM", - 3: "umol/mol", - 4: "mg/m3", - 5: "ug/m3", - 6: "℃", - 7: "%" -} - -decimals = { - 0: "没有小数点", - 1: "有一位小数", - 2: "有两位小数", - 3: "有三位小数" -} - -gas_statuses = { - 0: "预热", - 1: "正常", - 3: "传感器故障", - 5: "低限报警", - 6: "高限报警" -} - -gas_types = { - 3: "硫化氢 (H2S)", - 4: "一氧化碳 (CO)", - 5: "氧气 (O2)", - 50: "可燃气体 (Ex)", -} +from typing import Dict, List, Callable, Optional, Any, Type +import traceback +from common.global_logger import logger -def handle_precision(gas_value, gas_dec): - if gas_dec == 0: - return gas_value - elif gas_dec == 1: - return gas_value / 10 - elif gas_dec == 2: - return gas_value / 100 - elif gas_dec == 3: - return gas_value / 1000 - else: - return gas_value +class TcpServer: + """TCP服务器,处理设备连接和数据接收""" + def __init__(self, host: str = "0.0.0.0", port: int = 9001): + self.host = host + self.port = port + self.server = None + self.clients: Dict[str, asyncio.StreamWriter] = {} + self.on_data_callbacks: List[Callable] = [] -# 示例数据解析函数 -def parse_sensor_data(device_code, sensor_data): - for data in sensor_data: - flag = data.get("flag") - gas_value = data.get("gas_value") - gas_dec = data.get("gas_dec") - gas_status = data.get("gas_status") - gas_type = data.get("gas_type") - gas_unit = data.get("gas_unit") + def register_data_callback(self, callback: Callable) -> None: + """注册数据回调函数""" + self.on_data_callbacks.append(callback) - # 获取单位、精度、状态和气体类型的描述 - unit = gas_units.get(gas_unit, "未知单位") - precision = decimals.get(gas_dec, "未知精度") - status = gas_statuses.get(gas_status, "未知状态") - gas_type_name = gas_types.get(gas_type, "未知气体") + async def start(self) -> None: + """启动TCP服务器""" + try: + self.server = await asyncio.start_server( + self._handle_client, self.host, self.port + ) + addr = self.server.sockets[0].getsockname() + logger.info(f"TCP服务器启动在 {addr}") - # 格式化气体浓度(根据精度进行转换) - gas_value = handle_precision(gas_value, gas_dec) + async with self.server: + await self.server.serve_forever() + except Exception as e: + logger.error(f"启动TCP服务器时出错: {e}") + logger.error(traceback.format_exc()) - gas_data = { - "flag": flag, - "gas_value": gas_value, - "gas_unit": unit, - "gas_status": status, - "gas_type": gas_type_name, - "precision": precision, - 'gas_ts': datetime.now() - } + async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + """处理客户端连接""" + addr = writer.get_extra_info("peername") + client_id = f"{addr[0]}:{addr[1]}" + self.clients[client_id] = writer + logger.info(f"客户端连接: {client_id}") - harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) - print(harmful_gas_manager.get_device_all_data(device_code)) + try: + while True: + # 接收数据 + data = await reader.read(1024) + if not data: + logger.info(f"连接关闭: {client_id}") + break + try: + message = data.decode('utf-8') + logger.info(f"收到数据({client_id}): {repr(message)}") + await self._process_data(message) + except UnicodeDecodeError as e: + logger.error(f"无法解析消息来自 {client_id}: {e}") + logger.debug(f"原始数据: {data}") + except ConnectionResetError: + logger.info(f"客户端断开: {client_id}") + except Exception as e: + logger.error(f"处理客户端 {client_id} 时出错: {e}") + logger.error(traceback.format_exc()) + finally: + if client_id in self.clients: + del self.clients[client_id] + writer.close() + await writer.wait_closed() -async def data_push(device_code, message): - global_config = GlobalConfig() - harmful_push_config = global_config.get_harmful_gas_push_config() - if harmful_push_config and harmful_push_config.push_url: - last_ts = push_ts_dict.get(device_code) - current_time = datetime.now() - - # 检查是否需要推送数据 - if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: - # 将字符串编码为字节类型 - encoded_bytes = base64.b64encode(message.encode('utf-8')) - # 将字节编码结果转换为字符串 - encoded_string = encoded_bytes.decode('utf-8') - print(f'before encode: {message}') - print(f'after encode: {encoded_string}') - push_message = {"content": encoded_string} - print(f'body: {push_message}') - asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message)) - push_ts_dict[device_code] = current_time # 更新推送时间戳 - else: - print('no harmful push config') - - -async def handle_message(message): - message = message.replace('\r\n', '\n').replace('\r', '\n') - - # 处理消息 - harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" - match = re.match(harmful_gas_pattern, message, re.DOTALL) - if match: - device_code = match.group(1) # 设备号 - print(f"设备号: {device_code}") - sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 - sensor_data_json = json.loads(sensor_data_str) - - if sensor_data_json.get('sensorDatas'): - sensor_data = sensor_data_json.get('sensorDatas') - parse_sensor_data(device_code, sensor_data) - - await data_push(device_code, message) - else: - print("无法解析消息") - - -# 处理客户端连接 -async def handle_client(reader, writer): - client_address = writer.get_extra_info('peername') - print(f"新连接: {client_address}") - - try: - while True: - # 接收数据 - data = await reader.read(1024) - if not data: - print(f"连接关闭: {client_address}") - break - + async def _process_data(self, data: str) -> None: + """处理接收到的数据""" + # 触发数据回调 + for callback in self.on_data_callbacks: try: - message = data.decode('utf-8') - print(f"收到数据({client_address}): {repr(message)}") - await handle_message(message) - except UnicodeDecodeError as e: - print(f"无法解析消息来自 {client_address}: {e}") - # You can either log the raw data or take other actions - print(f"原始数据: {data}") + if asyncio.iscoroutinefunction(callback): + await callback(data) + else: + callback(data) + except Exception as e: + logger.error(f"执行回调时出错: {e}") - await handle_message(message) + async def send_data(self, client_id: str, data: bytes) -> bool: + """发送数据到客户端""" + if client_id not in self.clients: + logger.warning(f"客户端 {client_id} 未连接") + return False - except ConnectionResetError: - print(f"客户端断开: {client_address}") - finally: - writer.close() - await writer.wait_closed() + writer = self.clients[client_id] + try: + writer.write(data) + await writer.drain() + return True + except Exception as e: + logger.error(f"向客户端 {client_id} 发送数据时出错: {e}") + logger.error(traceback.format_exc()) + return False - -# 主服务器函数 -async def start_server(): - server = await asyncio.start_server(handle_client, HOST, PORT) - print(f"服务器启动,监听地址: {HOST}:{PORT}") - - async with server: - await server.serve_forever() - - -if __name__ == "__main__": - asyncio.run(start_server()) + async def broadcast(self, data: bytes, exclude: Optional[List[str]] = None) -> None: + """广播数据到所有客户端""" + exclude = exclude or [] + for client_id, writer in self.clients.items(): + if client_id not in exclude: + try: + writer.write(data) + await writer.drain() + except Exception as e: + logger.error(f"向客户端 {client_id} 广播时出错: {e}") diff --git a/tcp/tcp_server.py b/tcp/tcp_server.py index 57011b5..e99bf0a 100644 --- a/tcp/tcp_server.py +++ b/tcp/tcp_server.py @@ -1,182 +1,107 @@ +# TCP服务器类 import asyncio -import base64 -from datetime import datetime -import re -import json - -from common.harmful_gas_manager import HarmfulGasManager -from common.http_utils import send_request_async -from services.global_config import GlobalConfig - -HOST = '0.0.0.0' -PORT = 9001 - -harmful_gas_manager = HarmfulGasManager() -push_ts_dict = {} - -gas_units = { - 0: "%LEL", - 1: "%VOL", - 2: "PPM", - 3: "umol/mol", - 4: "mg/m3", - 5: "ug/m3", - 6: "℃", - 7: "%" -} - -decimals = { - 0: "没有小数点", - 1: "有一位小数", - 2: "有两位小数", - 3: "有三位小数" -} - -gas_statuses = { - 0: "预热", - 1: "正常", - 3: "传感器故障", - 5: "低限报警", - 6: "高限报警" -} - -gas_types = { - 3: "硫化氢 (H2S)", - 4: "一氧化碳 (CO)", - 5: "氧气 (O2)", - 50: "可燃气体 (Ex)", -} +from typing import Dict, List, Callable, Optional, Any, Type +import traceback +from common.global_logger import logger -def handle_precision(gas_value, gas_dec): - if gas_dec == 0: - return gas_value - elif gas_dec == 1: - return gas_value / 10 - elif gas_dec == 2: - return gas_value / 100 - elif gas_dec == 3: - return gas_value / 1000 - else: - return gas_value +class TcpServer: + """TCP服务器,处理设备连接和数据接收""" + def __init__(self, host: str = "0.0.0.0", port: int = 9001): + self.host = host + self.port = port + self.server = None + self.clients: Dict[str, asyncio.StreamWriter] = {} + self.on_data_callbacks: List[Callable] = [] -# 示例数据解析函数 -def parse_sensor_data(device_code, sensor_data): - for data in sensor_data: - flag = data.get("flag") - gas_value = data.get("gas_value") - gas_dec = data.get("gas_dec") - gas_status = data.get("gas_status") - gas_type = data.get("gas_type") - gas_unit = data.get("gas_unit") + def register_data_callback(self, callback: Callable) -> None: + """注册数据回调函数""" + self.on_data_callbacks.append(callback) - # 获取单位、精度、状态和气体类型的描述 - unit = gas_units.get(gas_unit, "未知单位") - precision = decimals.get(gas_dec, "未知精度") - status = gas_statuses.get(gas_status, "未知状态") - gas_type_name = gas_types.get(gas_type, "未知气体") + async def start(self) -> None: + """启动TCP服务器""" + try: + self.server = await asyncio.start_server( + self._handle_client, self.host, self.port + ) + addr = self.server.sockets[0].getsockname() + logger.info(f"TCP服务器启动在 {addr}") - # 格式化气体浓度(根据精度进行转换) - gas_value = handle_precision(gas_value, gas_dec) + async with self.server: + await self.server.serve_forever() + except Exception as e: + logger.error(f"启动TCP服务器时出错: {e}") + logger.error(traceback.format_exc()) - gas_data = { - "flag": flag, - "gas_value": gas_value, - "gas_unit": unit, - "gas_status": status, - "gas_type": gas_type_name, - "precision": precision, - 'gas_ts': datetime.now() - } + async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + """处理客户端连接""" + addr = writer.get_extra_info("peername") + client_id = f"{addr[0]}:{addr[1]}" + self.clients[client_id] = writer + logger.info(f"客户端连接: {client_id}") - harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) - print(harmful_gas_manager.get_device_all_data(device_code)) + try: + while True: + # 接收数据 + data = await reader.read(1024) + if not data: + logger.info(f"连接关闭: {client_id}") + break + try: + message = data.decode('utf-8') + logger.info(f"收到数据({client_id}): {repr(message)}") + await self._process_data(message) + except UnicodeDecodeError as e: + logger.error(f"无法解析消息来自 {client_id}: {e}") + logger.debug(f"原始数据: {data}") + except ConnectionResetError: + logger.info(f"客户端断开: {client_id}") + except Exception as e: + logger.error(f"处理客户端 {client_id} 时出错: {e}") + logger.error(traceback.format_exc()) + finally: + if client_id in self.clients: + del self.clients[client_id] + writer.close() + await writer.wait_closed() -async def data_push(device_code, message): - global_config = GlobalConfig() - harmful_push_config = global_config.get_harmful_gas_push_config() - if harmful_push_config and harmful_push_config.push_url: - last_ts = push_ts_dict.get(device_code) - current_time = datetime.now() - - # 检查是否需要推送数据 - if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: - # 将字符串编码为字节类型 - encoded_bytes = base64.b64encode(message.encode('utf-8')) - # 将字节编码结果转换为字符串 - encoded_string = encoded_bytes.decode('utf-8') - print(f'before encode: {message}') - print(f'after encode: {encoded_string}') - push_message = {"content": encoded_string} - print(f'body: {push_message}') - asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message)) - push_ts_dict[device_code] = current_time # 更新推送时间戳 - else: - print('no harmful push config') - - -async def handle_message(message): - message = message.replace('\r\n', '\n').replace('\r', '\n') - - # 处理消息 - harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" - match = re.match(harmful_gas_pattern, message, re.DOTALL) - if match: - device_code = match.group(1) # 设备号 - print(f"设备号: {device_code}") - sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 - sensor_data_json = json.loads(sensor_data_str) - - if sensor_data_json.get('sensorDatas'): - sensor_data = sensor_data_json.get('sensorDatas') - parse_sensor_data(device_code, sensor_data) - - await data_push(device_code, message) - else: - print("无法解析消息") - - -# 处理客户端连接 -async def handle_client(reader, writer): - client_address = writer.get_extra_info('peername') - print(f"新连接: {client_address}") - - try: - while True: - # 接收数据 - data = await reader.read(1024) - if not data: - print(f"连接关闭: {client_address}") - break - + async def _process_data(self, data: str) -> None: + """处理接收到的数据""" + # 触发数据回调 + for callback in self.on_data_callbacks: try: - message = data.decode('utf-8') - print(f"收到数据({client_address}): {repr(message)}") - await handle_message(message) - except UnicodeDecodeError as e: - print(f"无法解析消息来自 {client_address}: {e}") - # You can either log the raw data or take other actions - print(f"原始数据: {data}") + if asyncio.iscoroutinefunction(callback): + await callback(data) + else: + callback(data) + except Exception as e: + logger.error(f"执行回调时出错: {e}") - await handle_message(message) + async def send_data(self, client_id: str, data: bytes) -> bool: + """发送数据到客户端""" + if client_id not in self.clients: + logger.warning(f"客户端 {client_id} 未连接") + return False - except ConnectionResetError: - print(f"客户端断开: {client_address}") - finally: - writer.close() - await writer.wait_closed() + writer = self.clients[client_id] + try: + writer.write(data) + await writer.drain() + return True + except Exception as e: + logger.error(f"向客户端 {client_id} 发送数据时出错: {e}") + logger.error(traceback.format_exc()) + return False - -# 主服务器函数 -async def start_server(): - server = await asyncio.start_server(handle_client, HOST, PORT) - print(f"服务器启动,监听地址: {HOST}:{PORT}") - - async with server: - await server.serve_forever() - - -if __name__ == "__main__": - asyncio.run(start_server()) + async def broadcast(self, data: bytes, exclude: Optional[List[str]] = None) -> None: + """广播数据到所有客户端""" + exclude = exclude or [] + for client_id, writer in self.clients.items(): + if client_id not in exclude: + try: + writer.write(data) + await writer.drain() + except Exception as e: + logger.error(f"向客户端 {client_id} 广播时出错: {e}") diff --git a/tcp/tcp_server.py.bak b/tcp/tcp_server.py.bak new file mode 100644 index 0000000..57011b5 --- /dev/null +++ b/tcp/tcp_server.py.bak @@ -0,0 +1,182 @@ +import asyncio +import base64 +from datetime import datetime +import re +import json + +from common.harmful_gas_manager import HarmfulGasManager +from common.http_utils import send_request_async +from services.global_config import GlobalConfig + +HOST = '0.0.0.0' +PORT = 9001 + +harmful_gas_manager = HarmfulGasManager() +push_ts_dict = {} + +gas_units = { + 0: "%LEL", + 1: "%VOL", + 2: "PPM", + 3: "umol/mol", + 4: "mg/m3", + 5: "ug/m3", + 6: "℃", + 7: "%" +} + +decimals = { + 0: "没有小数点", + 1: "有一位小数", + 2: "有两位小数", + 3: "有三位小数" +} + +gas_statuses = { + 0: "预热", + 1: "正常", + 3: "传感器故障", + 5: "低限报警", + 6: "高限报警" +} + +gas_types = { + 3: "硫化氢 (H2S)", + 4: "一氧化碳 (CO)", + 5: "氧气 (O2)", + 50: "可燃气体 (Ex)", +} + + +def handle_precision(gas_value, gas_dec): + if gas_dec == 0: + return gas_value + elif gas_dec == 1: + return gas_value / 10 + elif gas_dec == 2: + return gas_value / 100 + elif gas_dec == 3: + return gas_value / 1000 + else: + return gas_value + + +# 示例数据解析函数 +def parse_sensor_data(device_code, sensor_data): + for data in sensor_data: + flag = data.get("flag") + gas_value = data.get("gas_value") + gas_dec = data.get("gas_dec") + gas_status = data.get("gas_status") + gas_type = data.get("gas_type") + gas_unit = data.get("gas_unit") + + # 获取单位、精度、状态和气体类型的描述 + unit = gas_units.get(gas_unit, "未知单位") + precision = decimals.get(gas_dec, "未知精度") + status = gas_statuses.get(gas_status, "未知状态") + gas_type_name = gas_types.get(gas_type, "未知气体") + + # 格式化气体浓度(根据精度进行转换) + gas_value = handle_precision(gas_value, gas_dec) + + gas_data = { + "flag": flag, + "gas_value": gas_value, + "gas_unit": unit, + "gas_status": status, + "gas_type": gas_type_name, + "precision": precision, + 'gas_ts': datetime.now() + } + + harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) + print(harmful_gas_manager.get_device_all_data(device_code)) + + +async def data_push(device_code, message): + global_config = GlobalConfig() + harmful_push_config = global_config.get_harmful_gas_push_config() + if harmful_push_config and harmful_push_config.push_url: + last_ts = push_ts_dict.get(device_code) + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: + # 将字符串编码为字节类型 + encoded_bytes = base64.b64encode(message.encode('utf-8')) + # 将字节编码结果转换为字符串 + encoded_string = encoded_bytes.decode('utf-8') + print(f'before encode: {message}') + print(f'after encode: {encoded_string}') + push_message = {"content": encoded_string} + print(f'body: {push_message}') + asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message)) + push_ts_dict[device_code] = current_time # 更新推送时间戳 + else: + print('no harmful push config') + + +async def handle_message(message): + message = message.replace('\r\n', '\n').replace('\r', '\n') + + # 处理消息 + harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" + match = re.match(harmful_gas_pattern, message, re.DOTALL) + if match: + device_code = match.group(1) # 设备号 + print(f"设备号: {device_code}") + sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 + sensor_data_json = json.loads(sensor_data_str) + + if sensor_data_json.get('sensorDatas'): + sensor_data = sensor_data_json.get('sensorDatas') + parse_sensor_data(device_code, sensor_data) + + await data_push(device_code, message) + else: + print("无法解析消息") + + +# 处理客户端连接 +async def handle_client(reader, writer): + client_address = writer.get_extra_info('peername') + print(f"新连接: {client_address}") + + try: + while True: + # 接收数据 + data = await reader.read(1024) + if not data: + print(f"连接关闭: {client_address}") + break + + try: + message = data.decode('utf-8') + print(f"收到数据({client_address}): {repr(message)}") + await handle_message(message) + except UnicodeDecodeError as e: + print(f"无法解析消息来自 {client_address}: {e}") + # You can either log the raw data or take other actions + print(f"原始数据: {data}") + + await handle_message(message) + + except ConnectionResetError: + print(f"客户端断开: {client_address}") + finally: + writer.close() + await writer.wait_closed() + + +# 主服务器函数 +async def start_server(): + server = await asyncio.start_server(handle_client, HOST, PORT) + print(f"服务器启动,监听地址: {HOST}:{PORT}") + + async with server: + await server.serve_forever() + + +if __name__ == "__main__": + asyncio.run(start_server())