{
+
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+
+ // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串
+ // 这里假设使用的是UTF-8字符集
+ pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
+
+ // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf
+ // 这样服务器发送字符串时,客户端可以直接接收到字符串
+ pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
+
+ // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑
+ pipeline.addLast("handler", new RobotChannelHandler());
+ }
+}
\ No newline at end of file
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java
new file mode 100644
index 0000000..11878ff
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java
@@ -0,0 +1,101 @@
+package com.casic.missiles.modular.system.netty;
+
+
+import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Slf4j
+@Component
+public class RobotChannelHandler extends ChannelInboundHandlerAdapter {
+ //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc
+// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+
+
+ private static RouteInfoServiceImpl routeInfoService;
+
+ public RobotChannelHandler() {
+ }
+
+ @Autowired
+ public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) {
+ this.routeInfoService = routeInfoService;
+ }
+
+ /**
+ * 服务器端收到任何一个客户端的消息都会触发这个方法
+ * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ super.channelRead(ctx, msg);
+ Channel channel = ctx.channel();
+// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8)));
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ String encryptData = msg.toString();
+ DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText());
+ System.out.println("接收到加密数据:" + encryptData);
+ routeInfoService.getMsg(1L,encryptData);
+// String decryptData = RSAUtil.getDecryptMsg(encryptData);
+// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) {
+// String devCode = decryptData.split(",")[0];
+// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText());
+// dataPanGasService.process(decryptData);
+// }
+ }
+ });
+ }
+
+ //表示服务端与客户端连接建立
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ Channel channel = ctx.channel(); //其实相当于一个connection
+
+ /**
+ * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush
+ *
+ * 先去广播,再将自己加入到channelGroup中
+ */
+// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n");
+// channelGroup.add(channel);
+ DeviceCommon.addChannel(channel.id().asLongText(), channel);
+
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ Channel channel = ctx.channel();
+ DeviceCommon.removeChannelByName(channel.id().asLongText());
+ }
+
+ //连接处于活动状态
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ Channel channel = ctx.channel();
+ System.out.println(channel.remoteAddress() + " 上线了");
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ Channel channel = ctx.channel();
+ System.out.println(channel.remoteAddress() + " 下线了");
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+}
\ No newline at end of file
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java
new file mode 100644
index 0000000..fd21dee
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java
@@ -0,0 +1,25 @@
+package com.casic.missiles.modular.system.netty;
+
+import com.casic.missiles.core.base.controller.ExportController;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.swagger.annotations.Api;
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.nio.charset.StandardCharsets;
+
+@Api(tags = "test")
+@RequiredArgsConstructor
+@RestController
+@RequestMapping("/test")
+public class TestController extends ExportController {
+ @PostMapping("/test")
+ public void repairLog() {
+ Channel channel = DeviceCommon.getChannelByName("1231313123");
+ channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8)));
+ }
+
+}
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java
index 814ffde..1658f2e 100644
--- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java
@@ -6,6 +6,7 @@
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
+import java.util.Map;
/**
*
@@ -25,6 +26,8 @@
void control(String command,Long robotDogId);
+ void processData(Map map);
+
Page pageList(Page page , RouteInfoDTO requestDTO);
diff --git a/casic-server/pom.xml b/casic-server/pom.xml
index 67e7aea..e7423c0 100644
--- a/casic-server/pom.xml
+++ b/casic-server/pom.xml
@@ -75,6 +75,17 @@
commons-httpclient
3.1
+
+
+ io.netty
+ netty-all
+
+
+
+ org.springframework.boot
+ spring-boot-starter-websocket
+ 2.4.5
+
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java
new file mode 100644
index 0000000..f2c946f
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java
@@ -0,0 +1,18 @@
+package com.casic.missiles.modular.system.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+// tomcat启动无需配置
+@Configuration
+public class WebSocketConfig {
+ /**
+ * 注入ServerEndpointExporter,
+ * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+ */
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter() {
+ return new ServerEndpointExporter();
+ }
+}
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java
index 80e4f5c..e0bc9c6 100644
--- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java
@@ -1,6 +1,7 @@
package com.casic.missiles.modular.system.controller;
import com.casic.missiles.modular.system.dto.ReturnDTO;
+import com.casic.missiles.modular.system.service.IRouteInfoService;
import com.casic.missiles.modular.system.util.ReturnUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -18,6 +19,8 @@
@RequestMapping("/device")
public class ReceiveDeviceDataController {
+ private final IRouteInfoService iRouteInfoService;
+
@ApiOperation("接收设备数据入口")
@PostMapping("/receiveData")
@ResponseBody
@@ -30,6 +33,7 @@
log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString());
CompletableFuture.runAsync(() -> {
+ iRouteInfoService.processData(map);
// ResponseResolver.makeResponse(map);
}
);
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java
new file mode 100644
index 0000000..4e34369
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java
@@ -0,0 +1,70 @@
+package com.casic.missiles.modular.system.netty;
+
+import cn.hutool.core.util.ObjectUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 全局变量,用于维护设备的登录信息
+ */
+@Slf4j
+@Component
+public class DeviceCommon {
+
+ public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500);
+ public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>();
+
+
+ public static Channel getChannelByName(String devCode) {
+ if (CollectionUtils.isEmpty(devcodeMap)) {
+ return null;
+ }
+ String channelId = devcodeMap.get(devCode);
+ if (ObjectUtil.isNotEmpty(channelId)) {
+ return CHANNEL_MAP.get(channelId);
+ }
+ return null;
+ }
+
+ /**
+ * 将channel和对应的name添加到ConcurrentHashMap
+ */
+ public static void addChannel(String channelId, Channel channel) {
+ CHANNEL_MAP.put(channelId, channel);
+ }
+
+ /**
+ * 移除掉name对应的channel
+ */
+ public static boolean removeChannelByName(String channelId) {
+ if (CHANNEL_MAP.containsKey(channelId)) {
+ CHANNEL_MAP.remove(channelId);
+ if (devcodeMap.containsValue(channelId)) {
+ devcodeMap.values().removeIf(value -> value.equals(channelId));
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean sendMsg(String devCode, String msg) {
+ log.info("****收到控制指令*****"+devCode+"---->"+msg);
+ try {
+ Channel channel = getChannelByName(devCode);
+ if (null != channel) {
+ channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8)));
+ return true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ log.info("****指令发送失败*****"+devCode+"---->"+msg);
+ return false;
+ }
+}
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java
new file mode 100644
index 0000000..c197b4f
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java
@@ -0,0 +1,24 @@
+package com.casic.missiles.modular.system.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * @description: 将从接口取到的数据编码
+ * @author: Stone
+ * @create: 2019-01-11 15:15
+ **/
+@Slf4j
+public class HjtDecoder extends MessageToMessageDecoder {
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List