import asyncio import base64 import json import re import time from copy import deepcopy import cv2 from config import BIZ_CLASS, PRESET_CONTENT_MAP from global_logger import logger, process_log_data from image_plotting import Annotator, COLOR_RED, COLOR_BLUE class HandelTCPCommand: def __init__(self, camera_processors, http_client): self.camera_processors = camera_processors self.http_client = http_client @staticmethod def get_cap_content(preset, gas_data): """获取捕获内容的配置信息""" content = PRESET_CONTENT_MAP.get(str(preset)) if content is None: return '', '', 0 report_type, desc, status = content if status is None: # 燃气泄漏特殊处理 status = 0 if float(gas_data) <= 0 else 1 return report_type, desc, status def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm): """处理特定类型的检测框并添加标注""" boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]] if not boxes: return None, None annotator = Annotator(deepcopy(frame)) color = COLOR_RED if gas_alarm else COLOR_BLUE for box in boxes: annotator.box_label( box=[box[0], box[1], box[2], box[3]], label='井盖' if box[5].index('井盖')>=0 else box[5], color=color ) success, jpg_data = cv2.imencode('.jpg', annotator.result()) if not success: return None, None # 根据不同类型返回对应的报告信息 report_info = { 'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'), 'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'), 'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}') } return jpg_data, report_info.get(box_type) async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude): """捕获当前帧并通过HTTP发送到后台""" cam_id = camera_processor.cam_id logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧") # 使用get_last_frame方法获取最后一帧,不直接访问内部属性 frame, biz_boxes = camera_processor.get_last_frame() if frame is None: logger.warning(f"摄像头 {cam_id} 没有可用的帧") return False try: skip_img = int(preset) == 5 and float(gas_data) <= 0 gas_alarm = 0 if float(gas_data) <= 0 else 1 # 获取基础配置 report_type, content, status = self.get_cap_content(preset, gas_data) jpg_data = None if not skip_img: # 按优先级依次检查各种类型的检测框 for box_type in ['WELL', 'ENTER', 'BOX']: jpg_data, report_info = self._process_boxes_with_annotation( frame, biz_boxes, box_type, gas_alarm ) if jpg_data is not None: report_type, content = report_info status = gas_alarm break # 如果没有检测到任何目标,使用原始帧 if jpg_data is None and not skip_img: success, jpg_data = cv2.imencode('.jpg', frame) if not success: logger.error(f"摄像头 {cam_id} 编码图像失败") return False request_data = { "routeNumber": preset, "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if ( jpg_data is not None and jpg_data.size > 0) else '', "reportType": report_type, "reportContent": content, "isAlarm": status, "reportValue": gas_data, "gpsPoints": f'{longitude},{latitude}' } logger.debug(process_log_data(request_data)) # 发送HTTP请求 await self.http_client.send([request_data]) logger.info(f"摄像头 {cam_id} 已发送当前帧到后台") return True except Exception as e: logger.exception(f"发送当前帧时发生错误: {e}") return False async def handle_capture_command(self, message): """处理TCP服务器发来的命令""" try: logger.info(f"处理TCP命令: {message}") pattern = r"^(\d+),([^,]+),([^,]+),([^,]+)$" # 检查是否是捕获图像的命令 match = re.match(pattern, message.strip()) if match: preset, gas_data, longitude, latitude = match.groups() logger.debug( f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}") # 对所有摄像头执行 for cam_id, processor in self.camera_processors.items(): await self.capture_and_send_current_frame(processor, preset, gas_data, longitude, latitude) logger.info("图像捕获和发送完成") # 发送处理完成的响应 response = { "status": "success", "timestamp": time.time(), "message": "图像已成功捕获并发送" } return response except json.JSONDecodeError: logger.error(f"无法解析JSON命令: {message}") except Exception as e: logger.exception(f"处理TCP命令时出错: {e}") # 发送错误响应 return { "status": "error", "timestamp": time.time(), "message": "处理命令时出错" }