package com.casic.missiles.mqtt; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.casic.missiles.model.exception.ServiceException; import com.casic.missiles.modular.robot.opt.enums.InstructCodeEnums; import com.casic.missiles.modular.robot.opt.instruct.base.dto.MessageRequestDTO; import com.casic.missiles.modular.robot.opt.instruct.base.dto.RobotMsgResponseDTO; import com.casic.missiles.mqtt.config.MqttClientConnection; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * mqtt消息处理工具 */ @Slf4j public class MsgUtils { public static Map<String, String> robotResult = new ConcurrentHashMap<>(); /** * 消息格式转换 * * @param msgJson 消息json * @param enums 枚举类 * @param <T> 对应泛型 * @return 消息转换 */ public static <T> RobotMsgResponseDTO<T> convertMessage(String msgJson, InstructCodeEnums enums) { return (RobotMsgResponseDTO<T>) JSON.parseObject(msgJson, enums.getCls()); } /** * 机器人通信 * * @param mqttClient mqtt连接 * @param messageDTO * @param <T> * @return */ public static <T> RobotMsgResponseDTO<T> senMsg(MqttClientConnection mqttClient, InstructCodeEnums enums, MessageRequestDTO messageDTO) { return senMsg(mqttClient, enums, messageDTO, 1); } /** * 机器人通信 * * @param mqttClient mqtt连接 * @param messageDTO * @param <T> * @return */ @SneakyThrows public static <T> RobotMsgResponseDTO<T> senMsg(MqttClientConnection mqttClient, InstructCodeEnums enums, MessageRequestDTO messageDTO, int qos) { Map<String, RobotMsgResponseDTO<T>> maps = new ConcurrentHashMap<>(); try { if (mqttClient == null) { throw new ServiceException(500, "机器人连接异常,请检查网络是否正常"); } MqttClient client = mqttClient.getMqttClient(); if (StrUtil.isNotEmpty(enums.getReqName())) { log.info("下发指令:{}", JSON.toJSONString(messageDTO)); mqttClient.pub(enums.getReqName(), JSON.toJSONString(messageDTO)); } // 订阅返回值消息 if (StrUtil.isNotEmpty(enums.getRepName())) { client.subscribe(enums.getRepName(), 1, (topic, message) -> { String data = new String(message.getPayload()); if (StrUtil.isNotEmpty(data)) { if (data.startsWith("\"")) { data = data.substring(1); } if (data.endsWith("\"")) { data = data.substring(0, data.length() - 1); } data = data.replaceAll("\\\\", ""); maps.put("result", MsgUtils.convertMessage(data, enums)); } else { throw new ServiceException(500, enums.getRemarks() + "指令异常"); } }); } int i = 0; if (enums.isWaitReply()) { do { Thread.sleep(200L); i++; } while (maps.isEmpty() && i < 60); } if (maps.isEmpty()) { RobotMsgResponseDTO res = new RobotMsgResponseDTO(); res.setIsSuccess(false); return res; } else { maps.get("result").setIsSuccess(true); } } catch (MqttException e) { log.error("mqtt订阅报错", e); throw new ServiceException(500, enums.getRemarks() + "指令异常"); } return maps.get("result"); } public static void main(String[] args) { String json = "{\"msg\": {\"error_code\": 0, \"error_msg\": \"\", \"result\": [\"79-1\", \"11\"]}}"; RobotMsgResponseDTO<List<String>> responseDTO = JSON.parseObject(json, new TypeReference<RobotMsgResponseDTO<List<String>>>() { }); System.out.println(responseDTO); } }