diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java index 814ffde..1658f2e 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; +import java.util.Map; /** *

@@ -25,6 +26,8 @@ void control(String command,Long robotDogId); + void processData(Map map); + Page pageList(Page page , RouteInfoDTO requestDTO); diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java index 814ffde..1658f2e 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; +import java.util.Map; /** *

@@ -25,6 +26,8 @@ void control(String command,Long robotDogId); + void processData(Map map); + Page pageList(Page page , RouteInfoDTO requestDTO); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java index 07c7ca3..ae8438a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java @@ -1,19 +1,23 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.core.common.service.ICommonFileService; import com.casic.missiles.modular.system.dao.PatrolLogMapper; import com.casic.missiles.modular.system.entity.PatrolLog; import com.casic.missiles.modular.system.service.IPatrolLogService; import com.casic.missiles.modular.system.util.CommonUtil; +import com.casic.missiles.modular.system.util.HttpClientUtils; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,6 +38,9 @@ @Resource private ICommonFileService commonFileService; + @Value("${casic.brPushUrl}") + private String brPushUrl; + /** * 存储算法上报的消息 * @@ -47,7 +54,7 @@ String picture = CommonUtil.getString(map, "picture"); String picturePath = ""; if (ObjectUtil.isNotEmpty(picture)) { - picturePath = commonFileService.saveFileBase64Image("data:image/png;base64,"+picture); + picturePath = commonFileService.saveFileBase64Image("data:image/png;base64," + picture); } patrolLogList.add(new PatrolLog(1L, CommonUtil.getString(map, "reportType"), CommonUtil.getString(map, "reportContent"), @@ -60,6 +67,8 @@ //推送第三方 CompletableFuture.runAsync(() -> { // ResponseResolver.makeResponse(map); + Map parasMap = new HashMap<>(); + HttpClientUtils.post(brPushUrl, JSON.toJSONString(parasMap)); } ); } catch (Exception e) { diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java index 814ffde..1658f2e 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; +import java.util.Map; /** *

@@ -25,6 +26,8 @@ void control(String command,Long robotDogId); + void processData(Map map); + Page pageList(Page page , RouteInfoDTO requestDTO); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java index 07c7ca3..ae8438a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java @@ -1,19 +1,23 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.core.common.service.ICommonFileService; import com.casic.missiles.modular.system.dao.PatrolLogMapper; import com.casic.missiles.modular.system.entity.PatrolLog; import com.casic.missiles.modular.system.service.IPatrolLogService; import com.casic.missiles.modular.system.util.CommonUtil; +import com.casic.missiles.modular.system.util.HttpClientUtils; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,6 +38,9 @@ @Resource private ICommonFileService commonFileService; + @Value("${casic.brPushUrl}") + private String brPushUrl; + /** * 存储算法上报的消息 * @@ -47,7 +54,7 @@ String picture = CommonUtil.getString(map, "picture"); String picturePath = ""; if (ObjectUtil.isNotEmpty(picture)) { - picturePath = commonFileService.saveFileBase64Image("data:image/png;base64,"+picture); + picturePath = commonFileService.saveFileBase64Image("data:image/png;base64," + picture); } patrolLogList.add(new PatrolLog(1L, CommonUtil.getString(map, "reportType"), CommonUtil.getString(map, "reportContent"), @@ -60,6 +67,8 @@ //推送第三方 CompletableFuture.runAsync(() -> { // ResponseResolver.makeResponse(map); + Map parasMap = new HashMap<>(); + HttpClientUtils.post(brPushUrl, JSON.toJSONString(parasMap)); } ); } catch (Exception e) { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java index 7d85040..501dedd 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java @@ -1,21 +1,26 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.modular.system.dao.RouteInfoMapper; import com.casic.missiles.modular.system.dto.RouteInfoDTO; import com.casic.missiles.modular.system.entity.RouteInfo; import com.casic.missiles.modular.system.entity.RouteInfoDetail; +import com.casic.missiles.modular.system.netty.DeviceCommon; import com.casic.missiles.modular.system.service.IRouteInfoDetailService; import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.SnowFlakeUtil; +import com.casic.missiles.modular.system.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Arrays; import java.util.List; +import java.util.Map; /** *

@@ -30,7 +35,8 @@ public class RouteInfoServiceImpl extends ServiceImpl implements IRouteInfoService { private final IRouteInfoDetailService detailService; - + private final DeviceCommon deviceCommon; + private final WebSocket webSocket; @Transactional @Override public boolean saveRouteInfo(RouteInfo routeInfo) { @@ -89,6 +95,7 @@ public void control(String command, Long robotDogId) { switch (command) { case "startRecordMap": + deviceCommon.sendMsg("111","startRecordMap"); // this.deviceInit(detectorId); break; case "endRecordMap": @@ -109,4 +116,34 @@ break; } } + + public void getMsg(Long detectorId, String msg) { + + //TOdO:处理数据 + + System.out.println("getMsg"+ msg); + //发送指令给设备 + //发送指令给设备 + } + + @Override + public void processData(Map map) { + + String content = JSON.toJSONString(map); + JSONObject json = JSONObject.parseObject(content); + String type = json.getString("type"); + //1:GPS;2:巡航预置点信息;3:巡航记录点信息;4:云图;5: 电量; + //{"type":2,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212","route_x":"1","route_y":"1","route_number":1},{"gps_x:1212","gps_y":"1212","route_x":"1","route_y":"1","route_number":2}]} + //{"type":1,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":3,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":5,"devcode":"1212",data:[{"cell:1212","gps_y":1212"}] + + + JSONObject msg = new JSONObject(); + msg.put("devcode", "1212"); + msg.put("gps_x", "121212"); + msg.put("gps_y", "322323"); + webSocket.sendAllMessage(msg.toJSONString()); + + } } diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java index 814ffde..1658f2e 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; +import java.util.Map; /** *

@@ -25,6 +26,8 @@ void control(String command,Long robotDogId); + void processData(Map map); + Page pageList(Page page , RouteInfoDTO requestDTO); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java index 07c7ca3..ae8438a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java @@ -1,19 +1,23 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.core.common.service.ICommonFileService; import com.casic.missiles.modular.system.dao.PatrolLogMapper; import com.casic.missiles.modular.system.entity.PatrolLog; import com.casic.missiles.modular.system.service.IPatrolLogService; import com.casic.missiles.modular.system.util.CommonUtil; +import com.casic.missiles.modular.system.util.HttpClientUtils; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,6 +38,9 @@ @Resource private ICommonFileService commonFileService; + @Value("${casic.brPushUrl}") + private String brPushUrl; + /** * 存储算法上报的消息 * @@ -47,7 +54,7 @@ String picture = CommonUtil.getString(map, "picture"); String picturePath = ""; if (ObjectUtil.isNotEmpty(picture)) { - picturePath = commonFileService.saveFileBase64Image("data:image/png;base64,"+picture); + picturePath = commonFileService.saveFileBase64Image("data:image/png;base64," + picture); } patrolLogList.add(new PatrolLog(1L, CommonUtil.getString(map, "reportType"), CommonUtil.getString(map, "reportContent"), @@ -60,6 +67,8 @@ //推送第三方 CompletableFuture.runAsync(() -> { // ResponseResolver.makeResponse(map); + Map parasMap = new HashMap<>(); + HttpClientUtils.post(brPushUrl, JSON.toJSONString(parasMap)); } ); } catch (Exception e) { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java index 7d85040..501dedd 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java @@ -1,21 +1,26 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.modular.system.dao.RouteInfoMapper; import com.casic.missiles.modular.system.dto.RouteInfoDTO; import com.casic.missiles.modular.system.entity.RouteInfo; import com.casic.missiles.modular.system.entity.RouteInfoDetail; +import com.casic.missiles.modular.system.netty.DeviceCommon; import com.casic.missiles.modular.system.service.IRouteInfoDetailService; import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.SnowFlakeUtil; +import com.casic.missiles.modular.system.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Arrays; import java.util.List; +import java.util.Map; /** *

@@ -30,7 +35,8 @@ public class RouteInfoServiceImpl extends ServiceImpl implements IRouteInfoService { private final IRouteInfoDetailService detailService; - + private final DeviceCommon deviceCommon; + private final WebSocket webSocket; @Transactional @Override public boolean saveRouteInfo(RouteInfo routeInfo) { @@ -89,6 +95,7 @@ public void control(String command, Long robotDogId) { switch (command) { case "startRecordMap": + deviceCommon.sendMsg("111","startRecordMap"); // this.deviceInit(detectorId); break; case "endRecordMap": @@ -109,4 +116,34 @@ break; } } + + public void getMsg(Long detectorId, String msg) { + + //TOdO:处理数据 + + System.out.println("getMsg"+ msg); + //发送指令给设备 + //发送指令给设备 + } + + @Override + public void processData(Map map) { + + String content = JSON.toJSONString(map); + JSONObject json = JSONObject.parseObject(content); + String type = json.getString("type"); + //1:GPS;2:巡航预置点信息;3:巡航记录点信息;4:云图;5: 电量; + //{"type":2,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212","route_x":"1","route_y":"1","route_number":1},{"gps_x:1212","gps_y":"1212","route_x":"1","route_y":"1","route_number":2}]} + //{"type":1,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":3,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":5,"devcode":"1212",data:[{"cell:1212","gps_y":1212"}] + + + JSONObject msg = new JSONObject(); + msg.put("devcode", "1212"); + msg.put("gps_x", "121212"); + msg.put("gps_y", "322323"); + webSocket.sendAllMessage(msg.toJSONString()); + + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java b/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java new file mode 100644 index 0000000..0f499e6 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.modular.system.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 67e7aea..e7423c0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -75,6 +75,17 @@ commons-httpclient 3.1 + + + io.netty + netty-all + + + + org.springframework.boot + spring-boot-starter-websocket + 2.4.5 + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java new file mode 100644 index 0000000..f2c946f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.casic.missiles.modular.system.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +// tomcat启动无需配置 +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java index 80e4f5c..e0bc9c6 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/ReceiveDeviceDataController.java @@ -1,6 +1,7 @@ package com.casic.missiles.modular.system.controller; import com.casic.missiles.modular.system.dto.ReturnDTO; +import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.ReturnUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -18,6 +19,8 @@ @RequestMapping("/device") public class ReceiveDeviceDataController { + private final IRouteInfoService iRouteInfoService; + @ApiOperation("接收设备数据入口") @PostMapping("/receiveData") @ResponseBody @@ -30,6 +33,7 @@ log.info(">>>>>> 接收数据 <<<<<<: {}", map.toString()); CompletableFuture.runAsync(() -> { + iRouteInfoService.processData(map); // ResponseResolver.makeResponse(map); } ); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java new file mode 100644 index 0000000..4e34369 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/DeviceCommon.java @@ -0,0 +1,70 @@ +package com.casic.missiles.modular.system.netty; + +import cn.hutool.core.util.ObjectUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 全局变量,用于维护设备的登录信息 + */ +@Slf4j +@Component +public class DeviceCommon { + + public static volatile ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(500); + public static volatile ConcurrentHashMap devcodeMap = new ConcurrentHashMap<>(); + + + public static Channel getChannelByName(String devCode) { + if (CollectionUtils.isEmpty(devcodeMap)) { + return null; + } + String channelId = devcodeMap.get(devCode); + if (ObjectUtil.isNotEmpty(channelId)) { + return CHANNEL_MAP.get(channelId); + } + return null; + } + + /** + * 将channel和对应的name添加到ConcurrentHashMap + */ + public static void addChannel(String channelId, Channel channel) { + CHANNEL_MAP.put(channelId, channel); + } + + /** + * 移除掉name对应的channel + */ + public static boolean removeChannelByName(String channelId) { + if (CHANNEL_MAP.containsKey(channelId)) { + CHANNEL_MAP.remove(channelId); + if (devcodeMap.containsValue(channelId)) { + devcodeMap.values().removeIf(value -> value.equals(channelId)); + } + return true; + } + return false; + } + + public static boolean sendMsg(String devCode, String msg) { + log.info("****收到控制指令*****"+devCode+"---->"+msg); + try { + Channel channel = getChannelByName(devCode); + if (null != channel) { + channel.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8))); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + log.info("****指令发送失败*****"+devCode+"---->"+msg); + return false; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java new file mode 100644 index 0000000..c197b4f --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/HjtDecoder.java @@ -0,0 +1,24 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * @description: 将从接口取到的数据编码 + * @author: Stone + * @create: 2019-01-11 15:15 + **/ +@Slf4j +public class HjtDecoder extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.info("hexBytes : " + ByteBufUtil.hexDump(msg) + ";" + " String : " + msg.toString(Charset.defaultCharset())); + out.add(msg.toString(Charset.defaultCharset())); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java new file mode 100644 index 0000000..52b8451 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServer.java @@ -0,0 +1,60 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +@Component +public class NettyServer { + + //负责处理接受进来的链接 + private EventLoopGroup bossGroup; + //负责处理已经被接收的连接上的I/O操作 + private EventLoopGroup workerGroup; + //在这个场景中,它表示服务器的绑定操作的结果 + private ChannelFuture future; + + @EventListener(ApplicationStartedEvent.class) + public void startServer() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + //创建ServerBootstrap,这个类封装了服务器端的网络配置,使得我们可以轻松地设置服务器参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer()) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, 1024) // 设置的ServerChannel的一些选项 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定端口并开始接受进来的连接 + future = bootstrap.bind(7000).sync(); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stopServer() { + if (future != null && !future.isDone()) { + future.cancel(true); + } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java new file mode 100644 index 0000000..9f75fc8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/NettyServerInitializer.java @@ -0,0 +1,28 @@ +package com.casic.missiles.modular.system.netty; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +public class NettyServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 添加一个字符串解码器,用于将接收到的ByteBuf转换成字符串 + // 这里假设使用的是UTF-8字符集 + pipeline.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); + + // 添加一个字符串编码器,用于将发送的字符串转换成ByteBuf + // 这样服务器发送字符串时,客户端可以直接接收到字符串 + pipeline.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); + + // 添加自定义的ChannelInboundHandlerAdapter来处理业务逻辑 + pipeline.addLast("handler", new RobotChannelHandler()); + } +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java new file mode 100644 index 0000000..11878ff --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/RobotChannelHandler.java @@ -0,0 +1,101 @@ +package com.casic.missiles.modular.system.netty; + + +import com.casic.missiles.modular.system.service.impl.RouteInfoServiceImpl; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +public class RobotChannelHandler extends ChannelInboundHandlerAdapter { + //保留所有与服务器建立连接的channel对象,这边的GlobalEventExecutor在写博客的时候解释一下,看其doc +// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + + private static RouteInfoServiceImpl routeInfoService; + + public RobotChannelHandler() { + } + + @Autowired + public void setDataPanGasService(RouteInfoServiceImpl routeInfoService) { + this.routeInfoService = routeInfoService; + } + + /** + * 服务器端收到任何一个客户端的消息都会触发这个方法 + * 连接的客户端向服务器端发送消息,那么其他客户端都收到此消息,自己收到【自己】+消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + Channel channel = ctx.channel(); +// channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes(StandardCharsets.UTF_8))); + executor.submit(new Runnable() { + @Override + public void run() { + String encryptData = msg.toString(); + DeviceCommon.devcodeMap.put(encryptData, channel.id().asLongText()); + System.out.println("接收到加密数据:" + encryptData); + routeInfoService.getMsg(1L,encryptData); +// String decryptData = RSAUtil.getDecryptMsg(encryptData); +// if (ObjectUtil.isNotEmpty(decryptData) && decryptData.contains(",")) { +// String devCode = decryptData.split(",")[0]; +// DeviceCommon.devcodeMap.put(devCode, channel.id().asLongText()); +// dataPanGasService.process(decryptData); +// } + } + }); + } + + //表示服务端与客户端连接建立 + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); //其实相当于一个connection + + /** + * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush + * + * 先去广播,再将自己加入到channelGroup中 + */ +// channelGroup.writeAndFlush(" 【服务器】 -" + channel.remoteAddress() + " 加入\n"); +// channelGroup.add(channel); + DeviceCommon.addChannel(channel.id().asLongText(), channel); + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + DeviceCommon.removeChannelByName(channel.id().asLongText()); + } + + //连接处于活动状态 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 上线了"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + " 下线了"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java new file mode 100644 index 0000000..fd21dee --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/netty/TestController.java @@ -0,0 +1,25 @@ +package com.casic.missiles.modular.system.netty; + +import com.casic.missiles.core.base.controller.ExportController; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.nio.charset.StandardCharsets; + +@Api(tags = "test") +@RequiredArgsConstructor +@RestController +@RequestMapping("/test") +public class TestController extends ExportController { + @PostMapping("/test") + public void repairLog() { + Channel channel = DeviceCommon.getChannelByName("1231313123"); + channel.writeAndFlush(Unpooled.copiedBuffer("send command".getBytes(StandardCharsets.UTF_8))); + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java index 814ffde..1658f2e 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/IRouteInfoService.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.service.IService; import java.util.List; +import java.util.Map; /** *

@@ -25,6 +26,8 @@ void control(String command,Long robotDogId); + void processData(Map map); + Page pageList(Page page , RouteInfoDTO requestDTO); diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java index 07c7ca3..ae8438a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/PatrolLogServiceImpl.java @@ -1,19 +1,23 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.core.common.service.ICommonFileService; import com.casic.missiles.modular.system.dao.PatrolLogMapper; import com.casic.missiles.modular.system.entity.PatrolLog; import com.casic.missiles.modular.system.service.IPatrolLogService; import com.casic.missiles.modular.system.util.CommonUtil; +import com.casic.missiles.modular.system.util.HttpClientUtils; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -34,6 +38,9 @@ @Resource private ICommonFileService commonFileService; + @Value("${casic.brPushUrl}") + private String brPushUrl; + /** * 存储算法上报的消息 * @@ -47,7 +54,7 @@ String picture = CommonUtil.getString(map, "picture"); String picturePath = ""; if (ObjectUtil.isNotEmpty(picture)) { - picturePath = commonFileService.saveFileBase64Image("data:image/png;base64,"+picture); + picturePath = commonFileService.saveFileBase64Image("data:image/png;base64," + picture); } patrolLogList.add(new PatrolLog(1L, CommonUtil.getString(map, "reportType"), CommonUtil.getString(map, "reportContent"), @@ -60,6 +67,8 @@ //推送第三方 CompletableFuture.runAsync(() -> { // ResponseResolver.makeResponse(map); + Map parasMap = new HashMap<>(); + HttpClientUtils.post(brPushUrl, JSON.toJSONString(parasMap)); } ); } catch (Exception e) { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java index 7d85040..501dedd 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/RouteInfoServiceImpl.java @@ -1,21 +1,26 @@ package com.casic.missiles.modular.system.service.impl; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.casic.missiles.modular.system.dao.RouteInfoMapper; import com.casic.missiles.modular.system.dto.RouteInfoDTO; import com.casic.missiles.modular.system.entity.RouteInfo; import com.casic.missiles.modular.system.entity.RouteInfoDetail; +import com.casic.missiles.modular.system.netty.DeviceCommon; import com.casic.missiles.modular.system.service.IRouteInfoDetailService; import com.casic.missiles.modular.system.service.IRouteInfoService; import com.casic.missiles.modular.system.util.SnowFlakeUtil; +import com.casic.missiles.modular.system.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Arrays; import java.util.List; +import java.util.Map; /** *

@@ -30,7 +35,8 @@ public class RouteInfoServiceImpl extends ServiceImpl implements IRouteInfoService { private final IRouteInfoDetailService detailService; - + private final DeviceCommon deviceCommon; + private final WebSocket webSocket; @Transactional @Override public boolean saveRouteInfo(RouteInfo routeInfo) { @@ -89,6 +95,7 @@ public void control(String command, Long robotDogId) { switch (command) { case "startRecordMap": + deviceCommon.sendMsg("111","startRecordMap"); // this.deviceInit(detectorId); break; case "endRecordMap": @@ -109,4 +116,34 @@ break; } } + + public void getMsg(Long detectorId, String msg) { + + //TOdO:处理数据 + + System.out.println("getMsg"+ msg); + //发送指令给设备 + //发送指令给设备 + } + + @Override + public void processData(Map map) { + + String content = JSON.toJSONString(map); + JSONObject json = JSONObject.parseObject(content); + String type = json.getString("type"); + //1:GPS;2:巡航预置点信息;3:巡航记录点信息;4:云图;5: 电量; + //{"type":2,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212","route_x":"1","route_y":"1","route_number":1},{"gps_x:1212","gps_y":"1212","route_x":"1","route_y":"1","route_number":2}]} + //{"type":1,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":3,"devcode":"1212",data:[{"gps_x:1212","gps_y":1212"}] + //{"type":5,"devcode":"1212",data:[{"cell:1212","gps_y":1212"}] + + + JSONObject msg = new JSONObject(); + msg.put("devcode", "1212"); + msg.put("gps_x", "121212"); + msg.put("gps_y", "322323"); + webSocket.sendAllMessage(msg.toJSONString()); + + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java b/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java new file mode 100644 index 0000000..0f499e6 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.modular.system.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index be5aa7e..e19ffb8 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -24,6 +24,7 @@ db: init: enable: false + brPushUrl: http://10.30.7.26:20110/monitorDataReceive/alarmPush logging: level.root: info level.com.casic: debug