Newer
Older
go-algo-server / handle_tcp_command.py
zhangyingjie on 2 Apr 6 KB 增加点位识别信息上报
import asyncio
import base64
import json
import re
import time
from copy import deepcopy

import cv2

from config import BIZ_CLASS, PRESET_CONTENT_MAP
from global_logger import logger, process_log_data
from image_plotting import Annotator, COLOR_RED, COLOR_BLUE


class HandelTCPCommand:
    def __init__(self, camera_processors, http_client):
        self.camera_processors = camera_processors
        self.http_client = http_client

    @staticmethod
    def get_cap_content(preset, gas_data):
        """获取捕获内容的配置信息"""
        content = PRESET_CONTENT_MAP.get(str(preset))
        if content is None:
            return '', '', 0
            
        report_type, desc, status = content
        if status is None:  # 燃气泄漏特殊处理
            status = 0 if float(gas_data) <= 0 else 1
        return report_type, desc, status
    
    def _process_boxes_with_annotation(self, frame, biz_boxes, box_type, gas_alarm):
        """处理特定类型的检测框并添加标注"""
        boxes = [box for box in biz_boxes if box[5] in BIZ_CLASS[box_type]]
        if not boxes:
            return None, None
            
        annotator = Annotator(deepcopy(frame))
        color = COLOR_RED if gas_alarm else COLOR_BLUE
        
        for box in boxes:
            annotator.box_label(
                box=[box[0], box[1], box[2], box[3]],
                label='井盖' if box[5].index('井盖')>=0 else box[5],
                color=color
            )
            
        success, jpg_data = cv2.imencode('.jpg', annotator.result())
        if not success:
            return None, None
            
        # 根据不同类型返回对应的报告信息
        report_info = {
            'WELL': ('10007', f'井盖完整,燃气浓度{"正常" if not gas_alarm else "异常"}'),
            'ENTER': ('10008', f'检测到引入口,燃气浓度{"正常" if not gas_alarm else "异常"}'),
            'BOX': ('10009', f'检测到调压箱,燃气浓度{"正常" if not gas_alarm else "异常"}')
        }
        
        return jpg_data, report_info.get(box_type)

    async def capture_and_send_current_frame(self, camera_processor, preset, gas_data, longitude, latitude):
        """捕获当前帧并通过HTTP发送到后台"""

        cam_id = camera_processor.cam_id
        logger.info(f"摄像头 {cam_id} 准备捕获并发送当前帧")

        # 使用get_last_frame方法获取最后一帧,不直接访问内部属性
        frame, biz_boxes = camera_processor.get_last_frame()
        if frame is None:
            logger.warning(f"摄像头 {cam_id} 没有可用的帧")
            return False

        try:
            skip_img = int(preset) == 5 and float(gas_data) <= 0
            gas_alarm = 0 if float(gas_data) <= 0 else 1

            # 获取基础配置
            report_type, content, status = self.get_cap_content(preset, gas_data)
            jpg_data = None
            
            if not skip_img:
                # 按优先级依次检查各种类型的检测框
                for box_type in ['WELL', 'ENTER', 'BOX']:
                    jpg_data, report_info = self._process_boxes_with_annotation(
                        frame, biz_boxes, box_type, gas_alarm
                    )
                    if jpg_data is not None:
                        report_type, content = report_info
                        status = gas_alarm
                        break
                
                # 如果没有检测到任何目标,使用原始帧
                if jpg_data is None and not skip_img:
                    success, jpg_data = cv2.imencode('.jpg', frame)
                    if not success:
                        logger.error(f"摄像头 {cam_id} 编码图像失败")
                        return False

            
            request_data = {
                "routeNumber": preset,
                "picture": base64.b64encode(jpg_data.tobytes()).decode('utf-8') if (
                            jpg_data is not None and jpg_data.size > 0) else '',
                "reportType": report_type,
                "reportContent": content,
                "isAlarm": status,
                "reportValue": gas_data,
                "gpsPoints": f'{longitude},{latitude}'
            }

            logger.debug(process_log_data(request_data))
            # 发送HTTP请求
            await self.http_client.send([request_data])
            logger.info(f"摄像头 {cam_id} 已发送当前帧到后台")
            return True
        except Exception as e:
            logger.exception(f"发送当前帧时发生错误: {e}")
            return False

    async def handle_capture_command(self, message):
        """处理TCP服务器发来的命令"""
        try:
            logger.info(f"处理TCP命令: {message}")

            pattern = r"^(\d+),([^,]+),([^,]+),([^,]+)$"

            # 检查是否是捕获图像的命令
            match = re.match(pattern, message.strip())
            if match:
                preset, gas_data, longitude, latitude = match.groups()

                logger.debug(
                    f"preset = {preset}, gas_data = {gas_data}, longitude = {longitude}, latitude = {latitude}")

                # 对所有摄像头执行
                for cam_id, processor in self.camera_processors.items():
                    await self.capture_and_send_current_frame(processor, preset, gas_data, longitude, latitude)

                logger.info("图像捕获和发送完成")

                # 发送处理完成的响应
                response = {
                    "status": "success",
                    "timestamp": time.time(),
                    "message": "图像已成功捕获并发送"
                }
                return response
        except json.JSONDecodeError:
            logger.error(f"无法解析JSON命令: {message}")
        except Exception as e:
            logger.exception(f"处理TCP命令时出错: {e}")

        # 发送错误响应
        return {
            "status": "error",
            "timestamp": time.time(),
            "message": "处理命令时出错"
        }