diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java index 014140a..387695a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java @@ -23,13 +23,15 @@ import java.util.regex.Pattern; /** - * SensorhubDecoder 解码器 + * @author cz + * @date 2024 + * SensorhubDecoder */ @Slf4j public class SensorhubDecoder extends ByteToMessageDecoder { /** - * 自定义协议解析 + * 解码器,这里decode是数据解析前的预处理和解析的入口 * 帧解码分为以下阶段 * 1、帧预处理,判断是否是标准的报文结构,可以通过各个平台的特点,进行拦截预处理,同时根据特点进行处理粘包问题,获取标准的报文 * 2、将标准的报文,调用通用协议处理解析器,进行协议解析处理 @@ -41,6 +43,7 @@ List abstractPreProcessingList = ClazzUtil.getSubClassList(AbstractPretreatment.class, true); ByteBuf standardByteBuf = buffer; String oldBuff = ByteBufUtil.hexDump(standardByteBuf); + //预处理电信平台发来base64等流程信息,为完成帧结构解析报文处理的标准化做准备 for (AbstractPretreatment abstractPretreatment : abstractPreProcessingList) { standardByteBuf = abstractPretreatment.decode(standardByteBuf); } @@ -50,30 +53,22 @@ ProtocolParser protocolParser = new GenericProtocolParser(); ProtocolConfigProvider protocolConfigProvider = new ProtocolConfigProvider(); ParseResult parseResult = null; -// DirectMemoryReporter memoryReporter=new DirectMemoryReporter(); //无论什么情况都交给,这里组装的内容,在回复的时候有效使用 Integer pre = 0; + //防止出现一个byteBuf出现多种帧的情况,这里采取了截取的方式,即指针偏移,获取帧返回结构即可进行下个阶段 while (parseResult == null && standardByteBuf.readerIndex() != standardByteBuf.writerIndex()) { pre = standardByteBuf.readerIndex(); + //数据解析入口,doParseProtocol是数据包解析的主要是流程方法 parseResult = protocolParser.doParseProtocol(standardByteBuf, protocolConfigProvider); //避免死循环 if (pre == standardByteBuf.readerIndex()) { break; } } -// destroy(standardByteBuf); if (parseResult != null) { parseResult.setRequestCode(pretreatmentStatus); list.add(parseResult); } } - -// public void destroy(ByteBuf byteBuf) { -// if (byteBuf != null && byteBuf.refCnt() > 0) { -// byteBuf.release(); -// byteBuf = null; -// } -// } - } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java index 014140a..387695a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java @@ -23,13 +23,15 @@ import java.util.regex.Pattern; /** - * SensorhubDecoder 解码器 + * @author cz + * @date 2024 + * SensorhubDecoder */ @Slf4j public class SensorhubDecoder extends ByteToMessageDecoder { /** - * 自定义协议解析 + * 解码器,这里decode是数据解析前的预处理和解析的入口 * 帧解码分为以下阶段 * 1、帧预处理,判断是否是标准的报文结构,可以通过各个平台的特点,进行拦截预处理,同时根据特点进行处理粘包问题,获取标准的报文 * 2、将标准的报文,调用通用协议处理解析器,进行协议解析处理 @@ -41,6 +43,7 @@ List abstractPreProcessingList = ClazzUtil.getSubClassList(AbstractPretreatment.class, true); ByteBuf standardByteBuf = buffer; String oldBuff = ByteBufUtil.hexDump(standardByteBuf); + //预处理电信平台发来base64等流程信息,为完成帧结构解析报文处理的标准化做准备 for (AbstractPretreatment abstractPretreatment : abstractPreProcessingList) { standardByteBuf = abstractPretreatment.decode(standardByteBuf); } @@ -50,30 +53,22 @@ ProtocolParser protocolParser = new GenericProtocolParser(); ProtocolConfigProvider protocolConfigProvider = new ProtocolConfigProvider(); ParseResult parseResult = null; -// DirectMemoryReporter memoryReporter=new DirectMemoryReporter(); //无论什么情况都交给,这里组装的内容,在回复的时候有效使用 Integer pre = 0; + //防止出现一个byteBuf出现多种帧的情况,这里采取了截取的方式,即指针偏移,获取帧返回结构即可进行下个阶段 while (parseResult == null && standardByteBuf.readerIndex() != standardByteBuf.writerIndex()) { pre = standardByteBuf.readerIndex(); + //数据解析入口,doParseProtocol是数据包解析的主要是流程方法 parseResult = protocolParser.doParseProtocol(standardByteBuf, protocolConfigProvider); //避免死循环 if (pre == standardByteBuf.readerIndex()) { break; } } -// destroy(standardByteBuf); if (parseResult != null) { parseResult.setRequestCode(pretreatmentStatus); list.add(parseResult); } } - -// public void destroy(ByteBuf byteBuf) { -// if (byteBuf != null && byteBuf.refCnt() > 0) { -// byteBuf.release(); -// byteBuf = null; -// } -// } - } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java index 3e9160f..44d22f1 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java @@ -43,7 +43,7 @@ /** * 查询匹配位置,进行再次匹配 - * 再次匹配 如何丢弃无法识别的帧, + * 再次匹配,丢弃无法识别的帧, *

* 如果是拆包序列2进入,需要重置byteBuf的读位置,进行重新匹配 * 匹配布上的数据 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java index 014140a..387695a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java @@ -23,13 +23,15 @@ import java.util.regex.Pattern; /** - * SensorhubDecoder 解码器 + * @author cz + * @date 2024 + * SensorhubDecoder */ @Slf4j public class SensorhubDecoder extends ByteToMessageDecoder { /** - * 自定义协议解析 + * 解码器,这里decode是数据解析前的预处理和解析的入口 * 帧解码分为以下阶段 * 1、帧预处理,判断是否是标准的报文结构,可以通过各个平台的特点,进行拦截预处理,同时根据特点进行处理粘包问题,获取标准的报文 * 2、将标准的报文,调用通用协议处理解析器,进行协议解析处理 @@ -41,6 +43,7 @@ List abstractPreProcessingList = ClazzUtil.getSubClassList(AbstractPretreatment.class, true); ByteBuf standardByteBuf = buffer; String oldBuff = ByteBufUtil.hexDump(standardByteBuf); + //预处理电信平台发来base64等流程信息,为完成帧结构解析报文处理的标准化做准备 for (AbstractPretreatment abstractPretreatment : abstractPreProcessingList) { standardByteBuf = abstractPretreatment.decode(standardByteBuf); } @@ -50,30 +53,22 @@ ProtocolParser protocolParser = new GenericProtocolParser(); ProtocolConfigProvider protocolConfigProvider = new ProtocolConfigProvider(); ParseResult parseResult = null; -// DirectMemoryReporter memoryReporter=new DirectMemoryReporter(); //无论什么情况都交给,这里组装的内容,在回复的时候有效使用 Integer pre = 0; + //防止出现一个byteBuf出现多种帧的情况,这里采取了截取的方式,即指针偏移,获取帧返回结构即可进行下个阶段 while (parseResult == null && standardByteBuf.readerIndex() != standardByteBuf.writerIndex()) { pre = standardByteBuf.readerIndex(); + //数据解析入口,doParseProtocol是数据包解析的主要是流程方法 parseResult = protocolParser.doParseProtocol(standardByteBuf, protocolConfigProvider); //避免死循环 if (pre == standardByteBuf.readerIndex()) { break; } } -// destroy(standardByteBuf); if (parseResult != null) { parseResult.setRequestCode(pretreatmentStatus); list.add(parseResult); } } - -// public void destroy(ByteBuf byteBuf) { -// if (byteBuf != null && byteBuf.refCnt() > 0) { -// byteBuf.release(); -// byteBuf = null; -// } -// } - } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java index 3e9160f..44d22f1 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java @@ -43,7 +43,7 @@ /** * 查询匹配位置,进行再次匹配 - * 再次匹配 如何丢弃无法识别的帧, + * 再次匹配,丢弃无法识别的帧, *

* 如果是拆包序列2进入,需要重置byteBuf的读位置,进行重新匹配 * 匹配布上的数据 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java index bab0601..e764e3a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.casic.missiles.dto.FrameNodeStr; import com.casic.missiles.factory.AbstractProtocolConfigFactory; import com.casic.missiles.parser.matcher.FrameStructMatchSupport; import com.casic.missiles.parser.matcher.FrameStructMatcher; @@ -53,18 +54,21 @@ ByteBuf intactMessageByte = null; //暂时实现不进行跳帧的情况,都为临近的帧数据内容 if (!ObjectUtil.isEmpty(unpackFlag)) { + //匹配帧长度 ByteBuf matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); //取到一个完整的帧 while (matchByteBuf != null) { unpackFlag = protocolFieldConfigProvider.getProtocolFieldValue(protocolConfig.getUnpackId(), matchByteBuf, protocolConfig.getFieldRuleConfigMap()); - //后续标志位结束 + //后续标志位结束,表示当前帧可以解析 if (unpackFlag == 1) { - //表示可以截取 + //帧结构合并判断 intactMessageByte = mergeMarkFrame(matchByteBuf, protocolFactory, datagramEventProvide, parseFixedDataMap); return intactMessageByte; } else { + //表示当前帧不可以解析,需要进行挂起操作 storeHalfPackBuf(protocolFactory, matchByteBuf); } + //前一帧没有解析,当前流可以读取,需再次进行匹配 matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); } } @@ -79,25 +83,35 @@ * 2、根据帧结构位置信息,合并相关的帧信息 */ private ByteBuf mergeMarkFrame(ByteBuf byteBufContent, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider, Map parseFixedDataMap) { - ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); - Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); //重置 ByteBuf mergeWholeFrameByte = ByteBufAllocator.DEFAULT.buffer(); - String tail = "", headFrame = "", bizFrameStr = ""; log.debug("--合并前" + ByteBufUtil.hexDump(byteBufContent)); - String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); - List sortList = getStoreHalfPackBuf(key); - //添加当前的帧信息 - sortList.add(MatchDataStore.ExpiringByteBuf.builder() - .byteBuf(byteBufContent) - .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) - .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) - .fixedStoreMap(parseFixedDataMap) - .build()); - //进行seq排序 - sortList = sortList.stream().sorted( - (e1, e2) -> ((Integer) e1.getFixedStoreMap().get(SEQ)).compareTo((Integer) e2.getFixedStoreMap().get(SEQ)) - ).collect(Collectors.toList()); + //获取挂起的帧并按seq进行排序,*******************这个seq属于业务定制,后续可以酌情修改 + List sortList = getHangUpFrame(protocolFactory, parseFixedDataMap, byteBufContent); + //对帧结构进行头、内容、尾划分,执行帧合并操作 + FrameNodeStr frameNodeStr = doMergeMarkFrame(sortList, protocolFactory, datagramEventProvider); + ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(frameNodeStr.getFrameBody(), frameNodeStr.getFrameHeader(), frameNodeStr.getFrameTail()); + //增加头 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameHeader())); + //增加业务内容 + mergeWholeFrameByte.writeBytes(bizFrameByte); + //增加尾部 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameTail())); + log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); + return mergeWholeFrameByte; + } + + /** + * 合并一小时内挂起的帧信息,如果只有当前 + * + * @param sortList + * @param protocolFactory + * @param datagramEventProvider + * @return + */ + private FrameNodeStr doMergeMarkFrame(List sortList, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String frameTail = "", frameHeader = "", frameBody = ""; //会存在多个帧拼接在一起的的情况,进行合并处理 for (MatchDataStore.ExpiringByteBuf expiringByteBuf : sortList) { ByteBuf currentByteBuf = expiringByteBuf.getByteBuf(); @@ -105,29 +119,49 @@ String hexDump = ByteBufUtil.hexDump(currentByteBuf); //合并报文,需要将密文进行提前解密合并,获取当前固定帧结构的关系map Supplier fieldLengthSupplier = protocolFactory.getProtocolFieldConfigProvider().getHistoryBizFieldLength(expiringByteBuf.getFixedStoreMap(), protocolConfig); - if (StringUtils.isEmpty(bizFrameStr)) { - headFrame = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); + if (StringUtils.isEmpty(frameBody)) { + frameHeader = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); //首次获取业务数据报文,明文合并,密文合并还存在问题 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr = plainText; - tail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); + frameBody = plainText; + frameTail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); } else { //是否需要解密 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr += plainText; + frameBody += plainText; } } - ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(bizFrameStr, headFrame, tail); - //增加头 - mergeWholeFrameByte.writeBytes(Hex.decode(headFrame)); - //增加业务内容 - mergeWholeFrameByte.writeBytes(bizFrameByte); - //增加尾部 - mergeWholeFrameByte.writeBytes(Hex.decode(tail)); - log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); - return mergeWholeFrameByte; + return FrameNodeStr.builder() + .frameHeader(frameHeader) + .frameBody(frameBody) + .frameTail(frameTail) + .build(); + } + + /** + * 通过设备编号获取挂起的帧 + * + * @param protocolFactory + * @param parseFixedDataMap + * @param byteBufContent + * @return + */ + private List getHangUpFrame(AbstractProtocolConfigFactory protocolFactory, Map parseFixedDataMap, ByteBuf byteBufContent) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); + //获取挂起的半包帧结构,并排序 + List sortList = getStoreHalfPackBuf(key); + Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); + //添加当前的帧信息 + sortList.add(MatchDataStore.ExpiringByteBuf.builder() + .byteBuf(byteBufContent) + .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) + .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) + .fixedStoreMap(parseFixedDataMap) + .build()); + return sortList; } @@ -145,6 +179,7 @@ List storeList = new ArrayList<>(); String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); // key += Thread.currentThread().getName(); + //当前设备编号作为key值进行存储 if (MatchDataStore.storeMap.containsKey(key)) { storeList = MatchDataStore.storeMap.get(key); } @@ -178,6 +213,12 @@ return sortStoreHalfPackBuf(storeList); } + /** + * 执行帧排序 + * + * @param storeHalfPackBufList + * @return + */ private List sortStoreHalfPackBuf(List storeHalfPackBufList) { return storeHalfPackBufList.stream() .sorted((e1, e2) -> ((Integer) e1.getFixedStoreMap().get("seq")) diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java index 014140a..387695a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java @@ -23,13 +23,15 @@ import java.util.regex.Pattern; /** - * SensorhubDecoder 解码器 + * @author cz + * @date 2024 + * SensorhubDecoder */ @Slf4j public class SensorhubDecoder extends ByteToMessageDecoder { /** - * 自定义协议解析 + * 解码器,这里decode是数据解析前的预处理和解析的入口 * 帧解码分为以下阶段 * 1、帧预处理,判断是否是标准的报文结构,可以通过各个平台的特点,进行拦截预处理,同时根据特点进行处理粘包问题,获取标准的报文 * 2、将标准的报文,调用通用协议处理解析器,进行协议解析处理 @@ -41,6 +43,7 @@ List abstractPreProcessingList = ClazzUtil.getSubClassList(AbstractPretreatment.class, true); ByteBuf standardByteBuf = buffer; String oldBuff = ByteBufUtil.hexDump(standardByteBuf); + //预处理电信平台发来base64等流程信息,为完成帧结构解析报文处理的标准化做准备 for (AbstractPretreatment abstractPretreatment : abstractPreProcessingList) { standardByteBuf = abstractPretreatment.decode(standardByteBuf); } @@ -50,30 +53,22 @@ ProtocolParser protocolParser = new GenericProtocolParser(); ProtocolConfigProvider protocolConfigProvider = new ProtocolConfigProvider(); ParseResult parseResult = null; -// DirectMemoryReporter memoryReporter=new DirectMemoryReporter(); //无论什么情况都交给,这里组装的内容,在回复的时候有效使用 Integer pre = 0; + //防止出现一个byteBuf出现多种帧的情况,这里采取了截取的方式,即指针偏移,获取帧返回结构即可进行下个阶段 while (parseResult == null && standardByteBuf.readerIndex() != standardByteBuf.writerIndex()) { pre = standardByteBuf.readerIndex(); + //数据解析入口,doParseProtocol是数据包解析的主要是流程方法 parseResult = protocolParser.doParseProtocol(standardByteBuf, protocolConfigProvider); //避免死循环 if (pre == standardByteBuf.readerIndex()) { break; } } -// destroy(standardByteBuf); if (parseResult != null) { parseResult.setRequestCode(pretreatmentStatus); list.add(parseResult); } } - -// public void destroy(ByteBuf byteBuf) { -// if (byteBuf != null && byteBuf.refCnt() > 0) { -// byteBuf.release(); -// byteBuf = null; -// } -// } - } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java index 3e9160f..44d22f1 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java @@ -43,7 +43,7 @@ /** * 查询匹配位置,进行再次匹配 - * 再次匹配 如何丢弃无法识别的帧, + * 再次匹配,丢弃无法识别的帧, *

* 如果是拆包序列2进入,需要重置byteBuf的读位置,进行重新匹配 * 匹配布上的数据 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java index bab0601..e764e3a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.casic.missiles.dto.FrameNodeStr; import com.casic.missiles.factory.AbstractProtocolConfigFactory; import com.casic.missiles.parser.matcher.FrameStructMatchSupport; import com.casic.missiles.parser.matcher.FrameStructMatcher; @@ -53,18 +54,21 @@ ByteBuf intactMessageByte = null; //暂时实现不进行跳帧的情况,都为临近的帧数据内容 if (!ObjectUtil.isEmpty(unpackFlag)) { + //匹配帧长度 ByteBuf matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); //取到一个完整的帧 while (matchByteBuf != null) { unpackFlag = protocolFieldConfigProvider.getProtocolFieldValue(protocolConfig.getUnpackId(), matchByteBuf, protocolConfig.getFieldRuleConfigMap()); - //后续标志位结束 + //后续标志位结束,表示当前帧可以解析 if (unpackFlag == 1) { - //表示可以截取 + //帧结构合并判断 intactMessageByte = mergeMarkFrame(matchByteBuf, protocolFactory, datagramEventProvide, parseFixedDataMap); return intactMessageByte; } else { + //表示当前帧不可以解析,需要进行挂起操作 storeHalfPackBuf(protocolFactory, matchByteBuf); } + //前一帧没有解析,当前流可以读取,需再次进行匹配 matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); } } @@ -79,25 +83,35 @@ * 2、根据帧结构位置信息,合并相关的帧信息 */ private ByteBuf mergeMarkFrame(ByteBuf byteBufContent, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider, Map parseFixedDataMap) { - ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); - Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); //重置 ByteBuf mergeWholeFrameByte = ByteBufAllocator.DEFAULT.buffer(); - String tail = "", headFrame = "", bizFrameStr = ""; log.debug("--合并前" + ByteBufUtil.hexDump(byteBufContent)); - String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); - List sortList = getStoreHalfPackBuf(key); - //添加当前的帧信息 - sortList.add(MatchDataStore.ExpiringByteBuf.builder() - .byteBuf(byteBufContent) - .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) - .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) - .fixedStoreMap(parseFixedDataMap) - .build()); - //进行seq排序 - sortList = sortList.stream().sorted( - (e1, e2) -> ((Integer) e1.getFixedStoreMap().get(SEQ)).compareTo((Integer) e2.getFixedStoreMap().get(SEQ)) - ).collect(Collectors.toList()); + //获取挂起的帧并按seq进行排序,*******************这个seq属于业务定制,后续可以酌情修改 + List sortList = getHangUpFrame(protocolFactory, parseFixedDataMap, byteBufContent); + //对帧结构进行头、内容、尾划分,执行帧合并操作 + FrameNodeStr frameNodeStr = doMergeMarkFrame(sortList, protocolFactory, datagramEventProvider); + ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(frameNodeStr.getFrameBody(), frameNodeStr.getFrameHeader(), frameNodeStr.getFrameTail()); + //增加头 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameHeader())); + //增加业务内容 + mergeWholeFrameByte.writeBytes(bizFrameByte); + //增加尾部 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameTail())); + log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); + return mergeWholeFrameByte; + } + + /** + * 合并一小时内挂起的帧信息,如果只有当前 + * + * @param sortList + * @param protocolFactory + * @param datagramEventProvider + * @return + */ + private FrameNodeStr doMergeMarkFrame(List sortList, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String frameTail = "", frameHeader = "", frameBody = ""; //会存在多个帧拼接在一起的的情况,进行合并处理 for (MatchDataStore.ExpiringByteBuf expiringByteBuf : sortList) { ByteBuf currentByteBuf = expiringByteBuf.getByteBuf(); @@ -105,29 +119,49 @@ String hexDump = ByteBufUtil.hexDump(currentByteBuf); //合并报文,需要将密文进行提前解密合并,获取当前固定帧结构的关系map Supplier fieldLengthSupplier = protocolFactory.getProtocolFieldConfigProvider().getHistoryBizFieldLength(expiringByteBuf.getFixedStoreMap(), protocolConfig); - if (StringUtils.isEmpty(bizFrameStr)) { - headFrame = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); + if (StringUtils.isEmpty(frameBody)) { + frameHeader = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); //首次获取业务数据报文,明文合并,密文合并还存在问题 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr = plainText; - tail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); + frameBody = plainText; + frameTail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); } else { //是否需要解密 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr += plainText; + frameBody += plainText; } } - ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(bizFrameStr, headFrame, tail); - //增加头 - mergeWholeFrameByte.writeBytes(Hex.decode(headFrame)); - //增加业务内容 - mergeWholeFrameByte.writeBytes(bizFrameByte); - //增加尾部 - mergeWholeFrameByte.writeBytes(Hex.decode(tail)); - log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); - return mergeWholeFrameByte; + return FrameNodeStr.builder() + .frameHeader(frameHeader) + .frameBody(frameBody) + .frameTail(frameTail) + .build(); + } + + /** + * 通过设备编号获取挂起的帧 + * + * @param protocolFactory + * @param parseFixedDataMap + * @param byteBufContent + * @return + */ + private List getHangUpFrame(AbstractProtocolConfigFactory protocolFactory, Map parseFixedDataMap, ByteBuf byteBufContent) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); + //获取挂起的半包帧结构,并排序 + List sortList = getStoreHalfPackBuf(key); + Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); + //添加当前的帧信息 + sortList.add(MatchDataStore.ExpiringByteBuf.builder() + .byteBuf(byteBufContent) + .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) + .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) + .fixedStoreMap(parseFixedDataMap) + .build()); + return sortList; } @@ -145,6 +179,7 @@ List storeList = new ArrayList<>(); String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); // key += Thread.currentThread().getName(); + //当前设备编号作为key值进行存储 if (MatchDataStore.storeMap.containsKey(key)) { storeList = MatchDataStore.storeMap.get(key); } @@ -178,6 +213,12 @@ return sortStoreHalfPackBuf(storeList); } + /** + * 执行帧排序 + * + * @param storeHalfPackBufList + * @return + */ private List sortStoreHalfPackBuf(List storeHalfPackBufList) { return storeHalfPackBufList.stream() .sorted((e1, e2) -> ((Integer) e1.getFixedStoreMap().get("seq")) diff --git a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java index 09d5e8e..a522930 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java @@ -143,7 +143,7 @@ /** * 数据订阅 */ - public void storeData(List> bizDataMap) { + public void subscribeData(List> bizDataMap) { SensorhubProperties sensorhubProperties = SpringContextUtil.getBean(SensorhubProperties.class); DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean(sensorhubProperties.getBean()); subscribeProvider.publishDataSubscribe(bizDataMap); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java index 87e0e07..d2e6c79 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/AepCommandSend.java @@ -35,49 +35,83 @@ @Slf4j public class AepCommandSend implements AepCommandEnum { - public void sendConfig(Map h2sDataMap) { - String productId = (String) h2sDataMap.get(PRODUCE_ID); - String deviceId = (String) h2sDataMap.get(DEVICE_ID); - BusConfigParam busConfigParam = new BusConfigParam(); + /** + * 处理电信平台的报文解析,并组装回复报文,发送电信平台 + * + * @param h2sDataMap + */ + public void handleAndReply(Map h2sDataMap) { AepConfig aepConfig = SpringContextUtil.getBean(AepConfig.class); - String aepKey = aepConfig.getKey(); - String aepSecret = aepConfig.getSecret(); - AepDeviceCommandLwmProfileClient client = AepDeviceCommandLwmProfileClient.newClient() - .appKey(aepKey).appSecret(aepSecret) - .build(); - Map dataGasMap = new HashMap<>(); - dataGasMap.put(SERVICE_ID, CONFIG); - dataGasMap.put(METHOD, CONFIG); - Map queryMap = new HashMap<>(); + AepDeviceCommandLwmProfileClient client = getAepClient(aepConfig); List list = new ArrayList<>(); SensorhubDecoder sensorhubDecoder = new SensorhubDecoder(); ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); bufferContent.writeBytes(JSON.toJSONString(h2sDataMap.get(PAYLOAD)).getBytes(Charset.forName(CHARSET))); + //对报文进行解码解析 sensorhubDecoder.decode(null, bufferContent, list); if (CollectionUtils.isNotEmpty(list)) { AbstractBuildReplyCommand abstractBuildReplyCommand = new DefaultReplyCommand(); - ByteBuf baseBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); - if (baseBytes == null) { + //构建回复报文 + ByteBuf replyBytes = abstractBuildReplyCommand.excute((ParseResult) list.get(0)); + if (replyBytes == null) { return; } - queryMap.put(VALUE, baseBytes.toString(Charset.forName(CHARSET))); - dataGasMap.put(PARAMS, queryMap); - busConfigParam.setCommand(dataGasMap); - busConfigParam.setProductId(productId); - busConfigParam.setDeviceId(deviceId); - busConfigParam.setLevel(1); - busConfigParam.setOperator(aepConfig.getOperator()); try { - busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); - CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); - request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value - request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + //组装请求返回的参数 + CreateCommandLwm2mProfileRequest request = getRequestContent(replyBytes, aepConfig, h2sDataMap); + //调用电信平台的客服端发送报文回复 CreateCommandLwm2mProfileResponse msgResponse = client.CreateCommandLwm2mProfile(request); log.info("send status-----" + msgResponse.getMessage()); } catch (Exception ex) { + log.error("电信平台发送失败,异常信息{}", ex); } finally { client.shutdown(); } } } + + /** + * 获取电信平台客户端 + * + * @param aepConfig + * @return + */ + private AepDeviceCommandLwmProfileClient getAepClient(AepConfig aepConfig) { + String aepKey = aepConfig.getKey(); + String aepSecret = aepConfig.getSecret(); + return AepDeviceCommandLwmProfileClient.newClient() + .appKey(aepKey).appSecret(aepSecret) + .build(); + } + + /** + * 构建aep平台的回复报文 + * + * @param replyBytes 回复的报文内容 + * @param aepConfig aep平台的配置 + * @param h2sDataMap + * @return + */ + private CreateCommandLwm2mProfileRequest getRequestContent(ByteBuf replyBytes, AepConfig aepConfig, Map h2sDataMap) { + Map queryMap = new HashMap<>(); + queryMap.put(VALUE, replyBytes.toString(Charset.forName(CHARSET))); + BusConfigParam busConfigParam = new BusConfigParam(); + CreateCommandLwm2mProfileRequest request = new CreateCommandLwm2mProfileRequest(); + Map dataGasMap = new HashMap<>(); + dataGasMap.put(SERVICE_ID, CONFIG); + dataGasMap.put(METHOD, CONFIG); + dataGasMap.put(PARAMS, queryMap); + String productId = (String) h2sDataMap.get(PRODUCE_ID); + String deviceId = (String) h2sDataMap.get(DEVICE_ID); + busConfigParam.setCommand(dataGasMap); + busConfigParam.setProductId(productId); + busConfigParam.setDeviceId(deviceId); + busConfigParam.setLevel(1); + busConfigParam.setOperator(aepConfig.getOperator()); + busConfigParam.setTtl(ObjectUtils.isNotEmpty(aepConfig.getTtl()) ? aepConfig.getTtl() : 3000); + request.setParamMasterKey(aepConfig.getParamMasterKey()); // single value + request.setBody(JSONObject.toJSONString(busConfigParam).getBytes()); + return request; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index 77b379f..b1f145f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -26,7 +26,7 @@ threadPoolExecutor.execute( () -> { AepCommandSend aepCommandSend = new AepCommandSend(); - aepCommandSend.sendConfig(dataMap); + aepCommandSend.handleAndReply(dataMap); } ); responseData.setCode(200); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java index 569b744..2258d38 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/listeners/ProtocolProcessEventListener.java @@ -23,6 +23,7 @@ /** * 流程节点事件监听器 + * 协议流程监听器,主要用于监控解析各个节点的解析内容,这些节点包括解密前的报文、解密后的明文、解析的数据、以及解析过程中出现的异常情况 */ public class ProtocolProcessEventListener { @@ -123,11 +124,11 @@ } /** - * 存库操作 + * 调用现场,将解析节点信息执行异步存库 * * @param devcode */ - public static void saveData(String devcode, ParseResult result) { + public static void saveData(String devcode) { //添加到线程池执行,异步存库 threadPool.execute( () -> { diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index 1a8e9d8..b91ed97 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -34,6 +34,8 @@ public class GenericProtocolParser extends ProtocolParserSupport implements ProtocolParser, ReplyCommandEnum { /** + * 方法中最重要的节点看根据序号查看 + *

* 标准数据报文的核心解析流程方法 * 1、前导码匹配报文协议 * 2、构建协议工厂(初始化获取与协议有关的数据库配置) @@ -48,9 +50,9 @@ */ @Override public ParseResult doParseProtocol(ByteBuf byteBuf, ProtocolConfigProvider protocolConfigProvider) { - //匹配前导码 + //1、匹配前导码,获取解析协议 ProtocolConfig protocolConfig = LeadingCodeMatcher.matchFrameLeadingCode(byteBuf, protocolConfigProvider); - //如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 + //健壮性校验,如果匹配不到前导码,则重置byteByf,判断是否是二次拆包发送,进行再次匹配 if (ObjectUtil.isEmpty(protocolConfig)) { return null; } @@ -61,7 +63,7 @@ Map parseFixedDataMap = getParseFixedDataMap(protocolFactory, byteBuf); try { devcode = (String) parseFixedDataMap.get("devcode"); - // 通过协议工厂匹配,匹配规则,获取规则配置 + //2、通过协议工厂匹配,匹配规则,获取规则配置 RuleConfig ruleConfig = getRuleConfig(protocolFactory, parseFixedDataMap); if (ObjectUtil.isEmpty(ruleConfig)) { //打印源数据,设备编号 @@ -74,19 +76,19 @@ } //创建规则相关的工厂,流程实例、字节解析、组合字段解析、字段解析规则等有关业务解析配置 AbstractRuleConfigFactory ruleConfigFactory = new DefaultRuleFactory(ruleConfig.getId()); - //获取流程实例配置 + //3、获取流程实例配置,判断配置是否完整正确 ProcessorInstanceProvider datagramEventProvider = ruleConfigFactory.getDatagramEventProvider(); DatagramEventConfig datagramEventConfig = datagramEventProvider.getProcessorInstance(); Assert.isFalse(Objects.isNull(datagramEventConfig), () -> { throw new EngineException(EngineExceptionEnum.PROTOCOL_INSTANCE_NULL); }); - //处理粘包拆包的主要组合 + //4、处理粘包拆包的主要组合,通过帧结构的进行处理 List frameStructDispenserList = ClazzUtil.getSubClassList(FrameStructMatcher.class, true); ByteBuf intactMessageByte = null; - //通过匹配帧结构获取完整的数据包,验证是否是一个完整的帧结构 + //通过匹配帧结构判断是否获取了完整的数据包,没有则挂起或舍弃 for (FrameStructMatcher frameStructMatcher : frameStructDispenserList) { //帧结构该协议,经过帧结构判断选定帧协议完整的数据报文 - if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory,datagramEventProvider, parseFixedDataMap)) != null) { + if ((intactMessageByte = frameStructMatcher.getIntactMessageByte(byteBuf, protocolFactory, datagramEventProvider, parseFixedDataMap)) != null) { break; } } @@ -97,45 +99,50 @@ } //获取报文的业务内容 ByteBuf bizDataByteBuf = protocolFactory.getProtocolFieldConfigProvider().getDataContentBuf(intactMessageByte); - //解密前的报文 - log.debug("解析的密文是----------" + ByteBufUtil.hexDump(bizDataByteBuf)); - //ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(bizDataByteBuf), 1); - //密文解析 + //5、业务内容密文解析 ByteBuf clearZeroPlainBuf = datagramEventProvider.getSafeDatagram(bizDataByteBuf, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); log.debug("解析的明文是----------" + ByteBufUtil.hexDump(clearZeroPlainBuf)); + //保存解密后报文信息到内存,ProtocolProcessEventListene协议流程监听器,主要 ProtocolProcessEventListener.setTask(devcode, ByteBufUtil.hexDump(clearZeroPlainBuf), 2); - //解密后报文 - //解析组合业务字段 + //6、解析tag业务字段 ruleConfigFactory.getCombinedFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, ruleConfigFactory.getFieldConfigProvider().getFieldConfigsMap(), protocolConfig.getFieldRuleConfigMap()); //解析单个业务字段 ruleConfigFactory.getFieldConfigProvider().parseDataField(ruleConfig, clearZeroPlainBuf, protocolConfig.getFieldRuleConfigMap()); - //构建发送解析任务 + //7、构建解析的数据集合 List> bizDataMap = buildStoreData(ruleConfigFactory, protocolFactory); //定制字段处理,例如分别解析的日期需要进行日期的和合并 invokeFieldPostProcessing(bizDataMap); - //根据解析内容,判断回复指令,构建解析返回内容 + //8、根据解析业务内容,判断回复指令,为构建解析返回内容做准备 List replyCommandPostProcessings = ClazzUtil.getSubClassList(AbstractReplyCommandPostProcessing.class, false); for (AbstractReplyCommandPostProcessing replyCommandPostProcessing : replyCommandPostProcessings) { - result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap,bizDataMap, result, ruleConfigFactory, protocolFactory); + result = replyCommandPostProcessing.obtainReplyCommand(parseFixedDataMap, bizDataMap, result, ruleConfigFactory, protocolFactory); } + //保存解析的业务字段信息到内存 ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); - //存储数据 - datagramEventProvider.storeData(bizDataMap); + //9、数据订阅 + datagramEventProvider.subscribeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 ProtocolProcessEventListener.setTask(devcode, rex.getMessage(), 4); result = null; frozenInvalidByteBuf(byteBuf, protocolConfig); + //出现异常情况,再次匹配,丢弃无法识别的帧即其中异常的数据 LeadingCodeMatcher.rematch(byteBuf, protocolConfigProvider.getMatchList()); } finally { - //数据发送,异步,转存整体数据 - ProtocolProcessEventListener.saveData(devcode, result); + //10、数据发送,异步,转存整体数据 + ProtocolProcessEventListener.saveData(devcode); return result; } } + /** + * 将匹配不满足的数据报文进行舍弃,将之后的内容进行冻结 + * + * @param byteBuf + * @param protocolConfig + */ private void frozenInvalidByteBuf(ByteBuf byteBuf, ProtocolConfig protocolConfig) { byteBuf.resetReaderIndex(); byteBuf.readBytes(protocolConfig.getPreFix().length()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java index 014140a..387695a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/SensorhubDecoder.java @@ -23,13 +23,15 @@ import java.util.regex.Pattern; /** - * SensorhubDecoder 解码器 + * @author cz + * @date 2024 + * SensorhubDecoder */ @Slf4j public class SensorhubDecoder extends ByteToMessageDecoder { /** - * 自定义协议解析 + * 解码器,这里decode是数据解析前的预处理和解析的入口 * 帧解码分为以下阶段 * 1、帧预处理,判断是否是标准的报文结构,可以通过各个平台的特点,进行拦截预处理,同时根据特点进行处理粘包问题,获取标准的报文 * 2、将标准的报文,调用通用协议处理解析器,进行协议解析处理 @@ -41,6 +43,7 @@ List abstractPreProcessingList = ClazzUtil.getSubClassList(AbstractPretreatment.class, true); ByteBuf standardByteBuf = buffer; String oldBuff = ByteBufUtil.hexDump(standardByteBuf); + //预处理电信平台发来base64等流程信息,为完成帧结构解析报文处理的标准化做准备 for (AbstractPretreatment abstractPretreatment : abstractPreProcessingList) { standardByteBuf = abstractPretreatment.decode(standardByteBuf); } @@ -50,30 +53,22 @@ ProtocolParser protocolParser = new GenericProtocolParser(); ProtocolConfigProvider protocolConfigProvider = new ProtocolConfigProvider(); ParseResult parseResult = null; -// DirectMemoryReporter memoryReporter=new DirectMemoryReporter(); //无论什么情况都交给,这里组装的内容,在回复的时候有效使用 Integer pre = 0; + //防止出现一个byteBuf出现多种帧的情况,这里采取了截取的方式,即指针偏移,获取帧返回结构即可进行下个阶段 while (parseResult == null && standardByteBuf.readerIndex() != standardByteBuf.writerIndex()) { pre = standardByteBuf.readerIndex(); + //数据解析入口,doParseProtocol是数据包解析的主要是流程方法 parseResult = protocolParser.doParseProtocol(standardByteBuf, protocolConfigProvider); //避免死循环 if (pre == standardByteBuf.readerIndex()) { break; } } -// destroy(standardByteBuf); if (parseResult != null) { parseResult.setRequestCode(pretreatmentStatus); list.add(parseResult); } } - -// public void destroy(ByteBuf byteBuf) { -// if (byteBuf != null && byteBuf.refCnt() > 0) { -// byteBuf.release(); -// byteBuf = null; -// } -// } - } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java index 3e9160f..44d22f1 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/LeadingCodeMatcher.java @@ -43,7 +43,7 @@ /** * 查询匹配位置,进行再次匹配 - * 再次匹配 如何丢弃无法识别的帧, + * 再次匹配,丢弃无法识别的帧, *

* 如果是拆包序列2进入,需要重置byteBuf的读位置,进行重新匹配 * 匹配布上的数据 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java index bab0601..e764e3a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java @@ -3,6 +3,7 @@ import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.casic.missiles.dto.FrameNodeStr; import com.casic.missiles.factory.AbstractProtocolConfigFactory; import com.casic.missiles.parser.matcher.FrameStructMatchSupport; import com.casic.missiles.parser.matcher.FrameStructMatcher; @@ -53,18 +54,21 @@ ByteBuf intactMessageByte = null; //暂时实现不进行跳帧的情况,都为临近的帧数据内容 if (!ObjectUtil.isEmpty(unpackFlag)) { + //匹配帧长度 ByteBuf matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); //取到一个完整的帧 while (matchByteBuf != null) { unpackFlag = protocolFieldConfigProvider.getProtocolFieldValue(protocolConfig.getUnpackId(), matchByteBuf, protocolConfig.getFieldRuleConfigMap()); - //后续标志位结束 + //后续标志位结束,表示当前帧可以解析 if (unpackFlag == 1) { - //表示可以截取 + //帧结构合并判断 intactMessageByte = mergeMarkFrame(matchByteBuf, protocolFactory, datagramEventProvide, parseFixedDataMap); return intactMessageByte; } else { + //表示当前帧不可以解析,需要进行挂起操作 storeHalfPackBuf(protocolFactory, matchByteBuf); } + //前一帧没有解析,当前流可以读取,需再次进行匹配 matchByteBuf = matchLength(byteBuf, datagramEventProvide.getProcessorInstance().getSafeLength(), protocolFactory); } } @@ -79,25 +83,35 @@ * 2、根据帧结构位置信息,合并相关的帧信息 */ private ByteBuf mergeMarkFrame(ByteBuf byteBufContent, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider, Map parseFixedDataMap) { - ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); - Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); //重置 ByteBuf mergeWholeFrameByte = ByteBufAllocator.DEFAULT.buffer(); - String tail = "", headFrame = "", bizFrameStr = ""; log.debug("--合并前" + ByteBufUtil.hexDump(byteBufContent)); - String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); - List sortList = getStoreHalfPackBuf(key); - //添加当前的帧信息 - sortList.add(MatchDataStore.ExpiringByteBuf.builder() - .byteBuf(byteBufContent) - .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) - .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) - .fixedStoreMap(parseFixedDataMap) - .build()); - //进行seq排序 - sortList = sortList.stream().sorted( - (e1, e2) -> ((Integer) e1.getFixedStoreMap().get(SEQ)).compareTo((Integer) e2.getFixedStoreMap().get(SEQ)) - ).collect(Collectors.toList()); + //获取挂起的帧并按seq进行排序,*******************这个seq属于业务定制,后续可以酌情修改 + List sortList = getHangUpFrame(protocolFactory, parseFixedDataMap, byteBufContent); + //对帧结构进行头、内容、尾划分,执行帧合并操作 + FrameNodeStr frameNodeStr = doMergeMarkFrame(sortList, protocolFactory, datagramEventProvider); + ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(frameNodeStr.getFrameBody(), frameNodeStr.getFrameHeader(), frameNodeStr.getFrameTail()); + //增加头 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameHeader())); + //增加业务内容 + mergeWholeFrameByte.writeBytes(bizFrameByte); + //增加尾部 + mergeWholeFrameByte.writeBytes(Hex.decode(frameNodeStr.getFrameTail())); + log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); + return mergeWholeFrameByte; + } + + /** + * 合并一小时内挂起的帧信息,如果只有当前 + * + * @param sortList + * @param protocolFactory + * @param datagramEventProvider + * @return + */ + private FrameNodeStr doMergeMarkFrame(List sortList, AbstractProtocolConfigFactory protocolFactory, ProcessorInstanceProvider datagramEventProvider) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String frameTail = "", frameHeader = "", frameBody = ""; //会存在多个帧拼接在一起的的情况,进行合并处理 for (MatchDataStore.ExpiringByteBuf expiringByteBuf : sortList) { ByteBuf currentByteBuf = expiringByteBuf.getByteBuf(); @@ -105,29 +119,49 @@ String hexDump = ByteBufUtil.hexDump(currentByteBuf); //合并报文,需要将密文进行提前解密合并,获取当前固定帧结构的关系map Supplier fieldLengthSupplier = protocolFactory.getProtocolFieldConfigProvider().getHistoryBizFieldLength(expiringByteBuf.getFixedStoreMap(), protocolConfig); - if (StringUtils.isEmpty(bizFrameStr)) { - headFrame = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); + if (StringUtils.isEmpty(frameBody)) { + frameHeader = hexDump.substring(0, expiringByteBuf.getFixPosition() * 2); //首次获取业务数据报文,明文合并,密文合并还存在问题 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr = plainText; - tail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); + frameBody = plainText; + frameTail = hexDump.substring(expiringByteBuf.getTailPosition() * 2); } else { //是否需要解密 String encipherHex = hexDump.substring(expiringByteBuf.getFixPosition() * 2, (expiringByteBuf.getTailPosition()) * 2); String plainText = datagramEventProvider.getPlainTextDatagram(encipherHex, protocolFactory.getProtocolFieldConfigProvider().getFixFieldConfigMap(), fieldLengthSupplier); - bizFrameStr += plainText; + frameBody += plainText; } } - ByteBuf bizFrameByte = protocolFactory.getProtocolFieldConfigProvider().setDataContentBuf(bizFrameStr, headFrame, tail); - //增加头 - mergeWholeFrameByte.writeBytes(Hex.decode(headFrame)); - //增加业务内容 - mergeWholeFrameByte.writeBytes(bizFrameByte); - //增加尾部 - mergeWholeFrameByte.writeBytes(Hex.decode(tail)); - log.debug("--合并后--" + ByteBufUtil.hexDump(mergeWholeFrameByte)); - return mergeWholeFrameByte; + return FrameNodeStr.builder() + .frameHeader(frameHeader) + .frameBody(frameBody) + .frameTail(frameTail) + .build(); + } + + /** + * 通过设备编号获取挂起的帧 + * + * @param protocolFactory + * @param parseFixedDataMap + * @param byteBufContent + * @return + */ + private List getHangUpFrame(AbstractProtocolConfigFactory protocolFactory, Map parseFixedDataMap, ByteBuf byteBufContent) { + ProtocolConfig protocolConfig = protocolFactory.getProtocolConfigProvider().getCurrentProtocolConfig(); + String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); + //获取挂起的半包帧结构,并排序 + List sortList = getStoreHalfPackBuf(key); + Map currentFrameFixedProperty = protocolFactory.getProtocolFieldConfigProvider().getFixedProperty(byteBufContent, protocolConfig); + //添加当前的帧信息 + sortList.add(MatchDataStore.ExpiringByteBuf.builder() + .byteBuf(byteBufContent) + .tailPosition(currentFrameFixedProperty.get(TAIL_POSITION)) + .fixPosition(currentFrameFixedProperty.get(FIXED_POSITION)) + .fixedStoreMap(parseFixedDataMap) + .build()); + return sortList; } @@ -145,6 +179,7 @@ List storeList = new ArrayList<>(); String key = (String) protocolFactory.getProtocolFieldConfigProvider().getStoreObjectMap().get("devcode"); // key += Thread.currentThread().getName(); + //当前设备编号作为key值进行存储 if (MatchDataStore.storeMap.containsKey(key)) { storeList = MatchDataStore.storeMap.get(key); } @@ -178,6 +213,12 @@ return sortStoreHalfPackBuf(storeList); } + /** + * 执行帧排序 + * + * @param storeHalfPackBufList + * @return + */ private List sortStoreHalfPackBuf(List storeHalfPackBufList) { return storeHalfPackBufList.stream() .sorted((e1, e2) -> ((Integer) e1.getFixedStoreMap().get("seq")) diff --git a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java index 09d5e8e..a522930 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java @@ -143,7 +143,7 @@ /** * 数据订阅 */ - public void storeData(List> bizDataMap) { + public void subscribeData(List> bizDataMap) { SensorhubProperties sensorhubProperties = SpringContextUtil.getBean(SensorhubProperties.class); DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean(sensorhubProperties.getBean()); subscribeProvider.publishDataSubscribe(bizDataMap); diff --git a/sensorhub-support/src/main/java/com/casic/missiles/dto/FrameNodeStr.java b/sensorhub-support/src/main/java/com/casic/missiles/dto/FrameNodeStr.java new file mode 100644 index 0000000..8010b65 --- /dev/null +++ b/sensorhub-support/src/main/java/com/casic/missiles/dto/FrameNodeStr.java @@ -0,0 +1,30 @@ +package com.casic.missiles.dto; + +import lombok.Builder; +import lombok.Data; + +/** + * 帧节点信息 + * + * @author cz + * @date 2024-05-11 + */ +@Builder +@Data +public class FrameNodeStr { + + /** + * 帧尾 + */ + private String frameTail; + /** + * 帧头 + */ + private String frameHeader; + /** + * 帧内容 + */ + private String frameBody; + + +}