diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java deleted file mode 100644 index b1ee6c4..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.nio.charset.Charset; -import java.util.List; - -/** - * @description: 将从接口取到的数据编码 - * @author: cz - * @create: 2023-05-04 15:15 - **/ -@Slf4j -@Service -public class AepPreprocessingCodec extends AbstractPreprocessingCodec { - private final Base64Dialect dialect; - - public AepPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public AepPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析 预处理 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java deleted file mode 100644 index b1ee6c4..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.nio.charset.Charset; -import java.util.List; - -/** - * @description: 将从接口取到的数据编码 - * @author: cz - * @create: 2023-05-04 15:15 - **/ -@Slf4j -@Service -public class AepPreprocessingCodec extends AbstractPreprocessingCodec { - private final Base64Dialect dialect; - - public AepPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public AepPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析 预处理 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java deleted file mode 100644 index 6e781b3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -public class MessageDecoder extends ByteToMessageDecoder { - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - System.out.println(ByteBufUtil.hexDump(byteBuf)); - } -} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java deleted file mode 100644 index b1ee6c4..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.nio.charset.Charset; -import java.util.List; - -/** - * @description: 将从接口取到的数据编码 - * @author: cz - * @create: 2023-05-04 15:15 - **/ -@Slf4j -@Service -public class AepPreprocessingCodec extends AbstractPreprocessingCodec { - private final Base64Dialect dialect; - - public AepPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public AepPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析 预处理 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java deleted file mode 100644 index 6e781b3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -public class MessageDecoder extends ByteToMessageDecoder { - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - System.out.println(ByteBufUtil.hexDump(byteBuf)); - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java deleted file mode 100644 index 062df5f..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.Charset; -import java.util.List; - -@Slf4j -public class NbPreprocessingCodec extends AbstractPreprocessingCodec { - - private final Base64Dialect dialect; - - public NbPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public NbPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析,预处理base编码 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java deleted file mode 100644 index b1ee6c4..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.nio.charset.Charset; -import java.util.List; - -/** - * @description: 将从接口取到的数据编码 - * @author: cz - * @create: 2023-05-04 15:15 - **/ -@Slf4j -@Service -public class AepPreprocessingCodec extends AbstractPreprocessingCodec { - private final Base64Dialect dialect; - - public AepPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public AepPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析 预处理 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java deleted file mode 100644 index 6e781b3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -public class MessageDecoder extends ByteToMessageDecoder { - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - System.out.println(ByteBufUtil.hexDump(byteBuf)); - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java deleted file mode 100644 index 062df5f..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.Charset; -import java.util.List; - -@Slf4j -public class NbPreprocessingCodec extends AbstractPreprocessingCodec { - - private final Base64Dialect dialect; - - public NbPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public NbPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析,预处理base编码 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java index e6c77bf..827744f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java @@ -1,13 +1,15 @@ package com.casic.missiles.netty.handler; +import com.casic.missiles.codec.SensorhubCodec; import com.casic.missiles.codec.SensorhubDecoder; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; import java.util.List; @@ -15,22 +17,21 @@ * @author cz * @date 2023-06-05 */ -@Service +@Component public class SensorhubServerChannelInitialHandler extends ChannelInitializer { @Autowired - private List proccessCodecList; + private List abstractPreprocessingList; @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); - for (AbstractPreprocessingCodec abstractPreprocessingCodec : proccessCodecList) { - pipeline.addLast(abstractPreprocessingCodec); + for (AbstractPreprocessing abstractPreprocessing:abstractPreprocessingList) { + pipeline.addLast(abstractPreprocessing); } -// pipeline.addLast(new MessageDecoder()); // pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*100, 0, 2, 0, 2)); -// pipeline.addLast("codec", new SensorhubCodec()); + pipeline.addLast("codec", new SensorhubCodec()); pipeline.addLast(new SensorhubDecoder()); pipeline.addLast(new SensorhubServerHandler()); // 心跳续约 diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index 46f3832..ae68ea8 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -75,6 +75,14 @@ 0.0.1-SNAPSHOT + + + org.reflections + reflections + 0.9.10 + + + diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java new file mode 100644 index 0000000..80c3458 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/ClassUtil.java @@ -0,0 +1,38 @@ +package com.casic.missiles.codec; + +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import lombok.extern.slf4j.Slf4j; +import org.reflections.Reflections; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Slf4j +public class ClassUtil { + +// public static List get(Class clazz){ +// List abstractPreprocessings = new ArrayList<>(); +// try{ +// String pk = clazz.getPackage().getName(); +// String path = pk.replace('.', '/'); +// //获取该路径下所有类 +// Reflections reflections = new Reflections(path); +// //获取继承了ISuperClass的所有类 +// Set> classSet = reflections.getSubTypesOf(clazz); +// +// for (Class clazz : classSet) { +// // 实例化获取到的类 +// T obj = clazz.newInstance(); +// // TODO 自己的处理逻辑 +// abstractPreprocessings.add(obj); +// } +// }catch (IllegalAccessException iae){ +// +// }catch (InstantiationException iae){ +// +// } +// return abstractPreprocessings; +// } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java index 98c61fb..25bfa04 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/SensorhubDecoder.java @@ -28,6 +28,7 @@ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List list) throws Exception { + // 可读长度必须大于帧的最小长度 if (buffer.readableBytes() >= MIN_FRAME_LEN) { System.out.println(ByteBufUtil.hexDump(buffer)); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java new file mode 100644 index 0000000..3658376 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/AbstractPreprocessing.java @@ -0,0 +1,15 @@ +package com.casic.missiles.codec.predecodec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class AbstractPreprocessing extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + list.add(byteBuf); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java new file mode 100644 index 0000000..ae08584 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/AepPreprocessing.java @@ -0,0 +1,56 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: cz + * @create: 2023-05-04 15:15 + **/ +@Slf4j +@Component +public class AepPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public AepPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public AepPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析 预处理 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + }else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java new file mode 100644 index 0000000..6ab1368 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/codec/predecodec/impl/NbPreprocessing.java @@ -0,0 +1,50 @@ +package com.casic.missiles.codec.predecodec.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.nio.charset.Charset; +import java.util.List; + +@Slf4j +@Component +public class NbPreprocessing extends AbstractPreprocessing { + + private final Base64Dialect dialect; + + public NbPreprocessing() { + this(Base64Dialect.STANDARD); + } + + public NbPreprocessing(Base64Dialect dialect) { + this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); + } + + //执行nb平台前的协议解析,预处理base编码 + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { + //aep平台处理 + if (msg.toString(Charset.defaultCharset()).contains("aep")) { + String httpContent = msg.toString(Charset.defaultCharset()); + log.info(" String : " + msg.toString(Charset.defaultCharset())); + String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); + String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); + log.info("----------------------------------" + values); + ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); + bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); + out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); + } else { + out.add(msg); + } + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java index 1e3e9fb..21c7b07 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/SensorhubServer.java @@ -13,6 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static cn.hutool.core.util.ClassUtil.getClasses; /** * SensorhubServer netty服务端 @@ -36,7 +41,7 @@ @Autowired private SensorhubProperties sensorhubProperties; @Autowired - private SensorhubServerChannelInitialHandler serverChannelInitialHandler; + private SensorhubServerChannelInitialHandler sensorhubHandler; public SensorhubServer(SensorhubProperties sensorhubProperties) { this.sensorhubProperties = sensorhubProperties; @@ -71,7 +76,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 - .childHandler(serverChannelInitialHandler) // 指定ChannelHandler + .childHandler(sensorhubHandler) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 当SO_KEEPALIVE=true的时候,服务端可以探测客户端的连接是否还存活着,如果客户端关闭了,那么服务端的连接可以关闭掉,释放资源 // 绑定端口,开始接收进来的连接 @@ -98,9 +103,5 @@ } - public static void main(String[] args) throws Exception { - SensorhubServer server = new SensorhubServer(new SensorhubProperties()); - server.startServer(); - } } \ No newline at end of file diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java deleted file mode 100644 index 33042a3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AbstractPreprocessingCodec.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; - -import java.util.List; - -public class AbstractPreprocessingCodec extends MessageToMessageDecoder { - - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java deleted file mode 100644 index b1ee6c4..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/AepPreprocessingCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.nio.charset.Charset; -import java.util.List; - -/** - * @description: 将从接口取到的数据编码 - * @author: cz - * @create: 2023-05-04 15:15 - **/ -@Slf4j -@Service -public class AepPreprocessingCodec extends AbstractPreprocessingCodec { - private final Base64Dialect dialect; - - public AepPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public AepPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析 预处理 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java deleted file mode 100644 index 6e781b3..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/MessageDecoder.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.casic.missiles.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -public class MessageDecoder extends ByteToMessageDecoder { - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - System.out.println(ByteBufUtil.hexDump(byteBuf)); - } -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java deleted file mode 100644 index 062df5f..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/NbPreprocessingCodec.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.casic.missiles.netty.handler; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.base64.Base64Dialect; -import io.netty.util.internal.ObjectUtil; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.Charset; -import java.util.List; - -@Slf4j -public class NbPreprocessingCodec extends AbstractPreprocessingCodec { - - private final Base64Dialect dialect; - - public NbPreprocessingCodec() { - this(Base64Dialect.STANDARD); - } - - public NbPreprocessingCodec(Base64Dialect dialect) { - this.dialect = ObjectUtil.checkNotNull(dialect, "dialect"); - } - - //执行nb平台前的协议解析,预处理base编码 - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) { - //aep平台处理 - if (msg.toString(Charset.defaultCharset()).contains("aep")) { - String httpContent = msg.toString(Charset.defaultCharset()); - log.info(" String : " + msg.toString(Charset.defaultCharset())); - String majorContent = httpContent.substring(httpContent.indexOf("{"), httpContent.lastIndexOf("}") + 1); - String values = String.valueOf(((JSONObject) ((JSONObject) JSON.parseObject(majorContent).get("payload")).get("serviceData")).get("Value")); - log.info("----------------------------------" + values); - ByteBuf bufferContent = ByteBufAllocator.DEFAULT.buffer(); - bufferContent.writeBytes(values.getBytes(Charset.forName("ISO-8859-1"))); - out.add(Base64.decode(bufferContent, bufferContent.readerIndex(), bufferContent.readableBytes(), this.dialect)); - } - } - -} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java index e6c77bf..827744f 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/netty/handler/SensorhubServerChannelInitialHandler.java @@ -1,13 +1,15 @@ package com.casic.missiles.netty.handler; +import com.casic.missiles.codec.SensorhubCodec; import com.casic.missiles.codec.SensorhubDecoder; +import com.casic.missiles.codec.predecodec.AbstractPreprocessing; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; import java.util.List; @@ -15,22 +17,21 @@ * @author cz * @date 2023-06-05 */ -@Service +@Component public class SensorhubServerChannelInitialHandler extends ChannelInitializer { @Autowired - private List proccessCodecList; + private List abstractPreprocessingList; @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); - for (AbstractPreprocessingCodec abstractPreprocessingCodec : proccessCodecList) { - pipeline.addLast(abstractPreprocessingCodec); + for (AbstractPreprocessing abstractPreprocessing:abstractPreprocessingList) { + pipeline.addLast(abstractPreprocessing); } -// pipeline.addLast(new MessageDecoder()); // pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*100, 0, 2, 0, 2)); -// pipeline.addLast("codec", new SensorhubCodec()); + pipeline.addLast("codec", new SensorhubCodec()); pipeline.addLast(new SensorhubDecoder()); pipeline.addLast(new SensorhubServerHandler()); // 心跳续约 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java index 85506a5..2bfc213 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/matcher/impl/FrameMarkMatcher.java @@ -37,25 +37,19 @@ fieldConfigDao.getFieldConfigById(protocolConfig.getUnpackId()); Integer totalLength = fieldParser.getTotalLength(protocolConfig, byteBuf); Integer upback = 0; + ByteBuf mergeWholeFrameByte = null; if (!ObjectUtil.isEmpty(UnpackConfig)) { while (hasNextFullFrame(byteBuf, protocolConfig)) { upback = (Integer) fieldResolverManger.filedParserManger(byteBuf, UnpackConfig, null); //是否存在后续位 if (upback == 1) {//使用esayRule //表示可以截取 - ByteBuf mergeWholeFrameByte = getWholeDatagramByte(byteBuf, protocolConfig, totalLength); - if (byteBuf.hasArray()) { - upback = 0; - } else { - upback = 1; - } + mergeWholeFrameByte = getWholeDatagramByte(byteBuf, protocolConfig, totalLength); + return mergeWholeFrameByte; } else { byteBuf.readerIndex(byteBuf.readerIndex() + getNextFrameOffset(byteBuf, protocolConfig)); } } - if (upback == 1) { - return null; - } } return null; }