diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/casic/util/HttpClientUtil.java b/src/main/java/com/casic/util/HttpClientUtil.java
new file mode 100644
index 0000000..9981414
--- /dev/null
+++ b/src/main/java/com/casic/util/HttpClientUtil.java
@@ -0,0 +1,134 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+public class HttpClientUtil {
+
+ public static String doGet(String url, Map param) {
+
+ // 创建Httpclient对象
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ String resultString = "";
+ CloseableHttpResponse response = null;
+ try {
+ // 创建uri
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+
+ // 创建http GET请求
+ HttpGet httpGet = new HttpGet(uri);
+
+ // 执行请求
+ response = httpclient.execute(httpGet);
+ // 判断返回状态是否为200
+ if (response.getStatusLine().getStatusCode() == 200) {
+ resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return resultString;
+ }
+ public static String doGet(String url) {
+ return doGet(url, null);
+ }
+
+ public static String doPost(String url, Map param) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建参数列表
+ if (param != null) {
+ List paramList = new ArrayList<>();
+ for (String key : param.keySet()) {
+ paramList.add(new BasicNameValuePair(key, param.get(key)));
+ }
+ // 模拟表单
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
+ httpPost.setEntity(entity);
+ }
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+
+ public static String doPost(String url) {
+ return doPost(url, null);
+ }
+
+ public static String doPostJson(String json, String url) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建请求内容
+ StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(entity);
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/casic/util/HttpClientUtil.java b/src/main/java/com/casic/util/HttpClientUtil.java
new file mode 100644
index 0000000..9981414
--- /dev/null
+++ b/src/main/java/com/casic/util/HttpClientUtil.java
@@ -0,0 +1,134 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+public class HttpClientUtil {
+
+ public static String doGet(String url, Map param) {
+
+ // 创建Httpclient对象
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ String resultString = "";
+ CloseableHttpResponse response = null;
+ try {
+ // 创建uri
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+
+ // 创建http GET请求
+ HttpGet httpGet = new HttpGet(uri);
+
+ // 执行请求
+ response = httpclient.execute(httpGet);
+ // 判断返回状态是否为200
+ if (response.getStatusLine().getStatusCode() == 200) {
+ resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return resultString;
+ }
+ public static String doGet(String url) {
+ return doGet(url, null);
+ }
+
+ public static String doPost(String url, Map param) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建参数列表
+ if (param != null) {
+ List paramList = new ArrayList<>();
+ for (String key : param.keySet()) {
+ paramList.add(new BasicNameValuePair(key, param.get(key)));
+ }
+ // 模拟表单
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
+ httpPost.setEntity(entity);
+ }
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+
+ public static String doPost(String url) {
+ return doPost(url, null);
+ }
+
+ public static String doPostJson(String json, String url) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建请求内容
+ StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(entity);
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Server.java b/src/main/java/com/casic/util/Server.java
new file mode 100644
index 0000000..50c0957
--- /dev/null
+++ b/src/main/java/com/casic/util/Server.java
@@ -0,0 +1,38 @@
+package com.casic.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 服务器socket
+ */
+public class Server {
+ public static void main(String[] args) {
+ try {
+ ServerSocket serverSocket = new ServerSocket(11321);
+ System.out.println("----------------服务端执行,開始监听请求----------------");
+
+ Socket socket = serverSocket.accept();//開始监听
+ InputStream inputStream = socket.getInputStream();
+ //获取请求内容
+ String info;
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ while ((info = bufferedReader.readLine()) != null) {
+ System.out.println("我是服务端,客户端请求为:" + info);
+ }
+ //关闭资源
+ socket.shutdownInput();
+ bufferedReader.close();
+ inputStream.close();
+ socket.close();
+ serverSocket.close();
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/casic/util/HttpClientUtil.java b/src/main/java/com/casic/util/HttpClientUtil.java
new file mode 100644
index 0000000..9981414
--- /dev/null
+++ b/src/main/java/com/casic/util/HttpClientUtil.java
@@ -0,0 +1,134 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+public class HttpClientUtil {
+
+ public static String doGet(String url, Map param) {
+
+ // 创建Httpclient对象
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ String resultString = "";
+ CloseableHttpResponse response = null;
+ try {
+ // 创建uri
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+
+ // 创建http GET请求
+ HttpGet httpGet = new HttpGet(uri);
+
+ // 执行请求
+ response = httpclient.execute(httpGet);
+ // 判断返回状态是否为200
+ if (response.getStatusLine().getStatusCode() == 200) {
+ resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return resultString;
+ }
+ public static String doGet(String url) {
+ return doGet(url, null);
+ }
+
+ public static String doPost(String url, Map param) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建参数列表
+ if (param != null) {
+ List paramList = new ArrayList<>();
+ for (String key : param.keySet()) {
+ paramList.add(new BasicNameValuePair(key, param.get(key)));
+ }
+ // 模拟表单
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
+ httpPost.setEntity(entity);
+ }
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+
+ public static String doPost(String url) {
+ return doPost(url, null);
+ }
+
+ public static String doPostJson(String json, String url) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建请求内容
+ StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(entity);
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Server.java b/src/main/java/com/casic/util/Server.java
new file mode 100644
index 0000000..50c0957
--- /dev/null
+++ b/src/main/java/com/casic/util/Server.java
@@ -0,0 +1,38 @@
+package com.casic.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 服务器socket
+ */
+public class Server {
+ public static void main(String[] args) {
+ try {
+ ServerSocket serverSocket = new ServerSocket(11321);
+ System.out.println("----------------服务端执行,開始监听请求----------------");
+
+ Socket socket = serverSocket.accept();//開始监听
+ InputStream inputStream = socket.getInputStream();
+ //获取请求内容
+ String info;
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ while ((info = bufferedReader.readLine()) != null) {
+ System.out.println("我是服务端,客户端请求为:" + info);
+ }
+ //关闭资源
+ socket.shutdownInput();
+ bufferedReader.close();
+ inputStream.close();
+ socket.close();
+ serverSocket.close();
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/ServerSocketUtil.java b/src/main/java/com/casic/util/ServerSocketUtil.java
new file mode 100644
index 0000000..e28ec13
--- /dev/null
+++ b/src/main/java/com/casic/util/ServerSocketUtil.java
@@ -0,0 +1,41 @@
+package com.casic.util;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+@Component
+public class ServerSocketUtil {
+
+ @Value("${casic.host}")
+ private String host;
+ @Value("${casic.port}")
+ private Integer port;
+
+ public String sendMsg(String Json) throws UnknownHostException, IOException {
+
+ // 向服务器端发送请求,服务器IP地址和服务器监听的端口号
+ Socket server = new Socket(host, port);
+ if(!server.isConnected()){
+ return "11";
+ }
+
+ OutputStream os = server.getOutputStream();
+ //把输出流封装在DataOutputStream中
+ DataOutputStream dos = new DataOutputStream(os);
+ //使用writeUTF发送字符串
+ dos.writeUTF("Legendary!");
+ dos.flush();
+ dos.close();
+ server.close();
+// System.out.println("连接已建立...");
+// // 发送消息
+// printWriter.println(Json);
+// printWriter.flush();
+ return "200";
+ }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/casic/util/HttpClientUtil.java b/src/main/java/com/casic/util/HttpClientUtil.java
new file mode 100644
index 0000000..9981414
--- /dev/null
+++ b/src/main/java/com/casic/util/HttpClientUtil.java
@@ -0,0 +1,134 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+public class HttpClientUtil {
+
+ public static String doGet(String url, Map param) {
+
+ // 创建Httpclient对象
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ String resultString = "";
+ CloseableHttpResponse response = null;
+ try {
+ // 创建uri
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+
+ // 创建http GET请求
+ HttpGet httpGet = new HttpGet(uri);
+
+ // 执行请求
+ response = httpclient.execute(httpGet);
+ // 判断返回状态是否为200
+ if (response.getStatusLine().getStatusCode() == 200) {
+ resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return resultString;
+ }
+ public static String doGet(String url) {
+ return doGet(url, null);
+ }
+
+ public static String doPost(String url, Map param) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建参数列表
+ if (param != null) {
+ List paramList = new ArrayList<>();
+ for (String key : param.keySet()) {
+ paramList.add(new BasicNameValuePair(key, param.get(key)));
+ }
+ // 模拟表单
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
+ httpPost.setEntity(entity);
+ }
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+
+ public static String doPost(String url) {
+ return doPost(url, null);
+ }
+
+ public static String doPostJson(String json, String url) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建请求内容
+ StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(entity);
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Server.java b/src/main/java/com/casic/util/Server.java
new file mode 100644
index 0000000..50c0957
--- /dev/null
+++ b/src/main/java/com/casic/util/Server.java
@@ -0,0 +1,38 @@
+package com.casic.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 服务器socket
+ */
+public class Server {
+ public static void main(String[] args) {
+ try {
+ ServerSocket serverSocket = new ServerSocket(11321);
+ System.out.println("----------------服务端执行,開始监听请求----------------");
+
+ Socket socket = serverSocket.accept();//開始监听
+ InputStream inputStream = socket.getInputStream();
+ //获取请求内容
+ String info;
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ while ((info = bufferedReader.readLine()) != null) {
+ System.out.println("我是服务端,客户端请求为:" + info);
+ }
+ //关闭资源
+ socket.shutdownInput();
+ bufferedReader.close();
+ inputStream.close();
+ socket.close();
+ serverSocket.close();
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/ServerSocketUtil.java b/src/main/java/com/casic/util/ServerSocketUtil.java
new file mode 100644
index 0000000..e28ec13
--- /dev/null
+++ b/src/main/java/com/casic/util/ServerSocketUtil.java
@@ -0,0 +1,41 @@
+package com.casic.util;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+@Component
+public class ServerSocketUtil {
+
+ @Value("${casic.host}")
+ private String host;
+ @Value("${casic.port}")
+ private Integer port;
+
+ public String sendMsg(String Json) throws UnknownHostException, IOException {
+
+ // 向服务器端发送请求,服务器IP地址和服务器监听的端口号
+ Socket server = new Socket(host, port);
+ if(!server.isConnected()){
+ return "11";
+ }
+
+ OutputStream os = server.getOutputStream();
+ //把输出流封装在DataOutputStream中
+ DataOutputStream dos = new DataOutputStream(os);
+ //使用writeUTF发送字符串
+ dos.writeUTF("Legendary!");
+ dos.flush();
+ dos.close();
+ server.close();
+// System.out.println("连接已建立...");
+// // 发送消息
+// printWriter.println(Json);
+// printWriter.flush();
+ return "200";
+ }
+
+}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
new file mode 100644
index 0000000..2f12836
--- /dev/null
+++ b/src/main/resources/application-dev.yml
@@ -0,0 +1,19 @@
+server:
+ port: 11316
+################### spring配置 ###################
+spring:
+ datasource:
+ driver-class-name: org.postgresql.Driver
+ url: jdbc:postgresql://111.198.10.15:11209/smartwell_zq_cs
+ username: postgres
+ password: casic203
+ session:
+ store-type: redis
+ jms:
+ pub-sub-domain: true
+logging:
+ level.root: error
+ level.com.casic: info
+ file:
+ path: logs/
+ name: missiles.log
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f5706ec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+
+ com.casic
+ data-forwarding
+ 1.0-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+ 2.1.3.RELEASE
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.1.3.RELEASE
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.1.3.RELEASE
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+ compile
+
+
+
+ com.baomidou
+ mybatis-plus-boot-starter
+ 3.4.3
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.19
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.12
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+
+ io.netty
+ netty-all
+ 4.1.36.Final
+
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.73
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.1.3.RELEASE
+
+ true
+
+ com.casic.CasicApplication
+ exec
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-war-plugin
+
+
+ false
+
+
+
+
+
+
+ src/main/resources
+
+
+ /config/*/*
+ /config/*-*.yml
+
+ true
+
+
+ src/main/resources
+
+ **/*.xml
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/casic/CasicApplication.java b/src/main/java/com/casic/CasicApplication.java
new file mode 100644
index 0000000..0887f1f
--- /dev/null
+++ b/src/main/java/com/casic/CasicApplication.java
@@ -0,0 +1,23 @@
+package com.casic;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * SpringBoot方式启动类
+ *
+ * @author cz
+ * @Date 2022/06/20 14:28
+ */
+
+@Slf4j
+@ComponentScan(basePackages= "com.casic.**")
+@SpringBootApplication
+public class CasicApplication {
+ public static void main(String[] args) {
+ log.info("CasicApplication is success!");
+ SpringApplication.run(CasicApplication.class, args);
+ }
+}
diff --git a/src/main/java/com/casic/controller/DataForwardingController.java b/src/main/java/com/casic/controller/DataForwardingController.java
new file mode 100644
index 0000000..e97ab9c
--- /dev/null
+++ b/src/main/java/com/casic/controller/DataForwardingController.java
@@ -0,0 +1,37 @@
+package com.casic.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.casic.util.HttpClientUtil;
+import com.casic.util.ServerSocketUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.Map;
+
+@Slf4j
+@RestController
+public class DataForwardingController {
+
+ @Value("${casic.url}")
+ private String url;
+ @Resource
+ private ServerSocketUtil serverSocketUtil;
+
+ @RequestMapping("/data/forwarding")
+ public String dataForwarding(@RequestBody Map map) {
+ log.info("----------"+JSONObject.toJSONString(map));
+ try {
+ HttpClientUtil.doPostJson(JSONObject.toJSONString(map),url);
+ serverSocketUtil.sendMsg(JSONObject.toJSONString(map));
+ } catch (Exception ex) {
+ log.error("错误");
+ }
+ return "200";
+ }
+
+
+}
diff --git a/src/main/java/com/casic/service/MyChannelHandlerPool.java b/src/main/java/com/casic/service/MyChannelHandlerPool.java
new file mode 100644
index 0000000..9766114
--- /dev/null
+++ b/src/main/java/com/casic/service/MyChannelHandlerPool.java
@@ -0,0 +1,19 @@
+package com.casic.service;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+/**
+ * MyChannelHandlerPool
+ * 通道组池,管理所有websocket连接
+ * @author zhengkai.blog.csdn.net
+ * @date 2019-06-12
+ */
+public class MyChannelHandlerPool {
+
+ public MyChannelHandlerPool(){}
+
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/MyWebSocketHandler.java b/src/main/java/com/casic/service/MyWebSocketHandler.java
new file mode 100644
index 0000000..db88c8c
--- /dev/null
+++ b/src/main/java/com/casic/service/MyWebSocketHandler.java
@@ -0,0 +1,82 @@
+package com.casic.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MyWebSocketHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端建立连接,通道开启!");
+
+ //添加到channelGroup通道组
+ MyChannelHandlerPool.channelGroup.add(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ System.out.println("与客户端断开连接,通道关闭!");
+ //添加到channelGroup 通道组
+ MyChannelHandlerPool.channelGroup.remove(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
+ if (null != msg && msg instanceof FullHttpRequest) {
+ FullHttpRequest request = (FullHttpRequest) msg;
+ String uri = request.uri();
+ Map paramMap=getUrlParams(uri);
+ System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
+ //如果url包含参数,需要处理
+ if(uri.contains("?")){
+ String newUri=uri.substring(0,uri.indexOf("?"));
+ System.out.println(newUri);
+ request.setUri(newUri);
+ }
+
+ }else if(msg instanceof TextWebSocketFrame){
+ //正常的TEXT消息类型
+ TextWebSocketFrame frame=(TextWebSocketFrame)msg;
+ System.out.println("客户端收到服务器数据:" +frame.text());
+ sendAllMessage(frame.text());
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+
+ }
+
+ private void sendAllMessage(String message){
+ //收到信息后,群发给所有channel
+ MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
+ }
+
+ private static Map getUrlParams(String url){
+ Map map = new HashMap<>();
+ url = url.replace("?",";");
+ if (!url.contains(";")){
+ return map;
+ }
+ if (url.split(";").length > 0){
+ String[] arr = url.split(";")[1].split("&");
+ for (String s : arr){
+ String key = s.split("=")[0];
+ String value = s.split("=")[1];
+ map.put(key,value);
+ }
+ return map;
+
+ }else{
+ return map;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/service/NettyServer.java b/src/main/java/com/casic/service/NettyServer.java
new file mode 100644
index 0000000..8a0fc28
--- /dev/null
+++ b/src/main/java/com/casic/service/NettyServer.java
@@ -0,0 +1,58 @@
+package com.casic.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * NettyServer Netty服务器配置
+ */
+public class NettyServer {
+ private final int port;
+
+ public NettyServer(int port) {
+ this.port = port;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024);
+ sb.group(group, bossGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class) // 指定使用的channel
+ .localAddress(this.port)// 绑定监听端口
+ .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ System.out.println("收到新连接");
+ //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ //以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
+ ch.pipeline().addLast(new MyWebSocketHandler());
+ }
+ });
+ ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
+ System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
+ cf.channel().closeFuture().sync(); // 关闭服务器通道
+ } finally {
+ group.shutdownGracefully().sync(); // 释放线程池资源
+ bossGroup.shutdownGracefully().sync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Client.java b/src/main/java/com/casic/util/Client.java
new file mode 100644
index 0000000..a416600
--- /dev/null
+++ b/src/main/java/com/casic/util/Client.java
@@ -0,0 +1,34 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 客户端socket
+ */
+public class Client {
+ public static void main(String[] args) {
+ try {
+ //发送到8888端口
+ Socket socket = new Socket("127.0.0.1", 11321);
+ //输出流
+ OutputStream outputStream = socket.getOutputStream();
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.write("服务端你好,我是客户端");
+ printWriter.flush();
+ //关闭资源
+ printWriter.close();
+ outputStream.close();
+ socket.close();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/casic/util/HttpClientUtil.java b/src/main/java/com/casic/util/HttpClientUtil.java
new file mode 100644
index 0000000..9981414
--- /dev/null
+++ b/src/main/java/com/casic/util/HttpClientUtil.java
@@ -0,0 +1,134 @@
+package com.casic.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+public class HttpClientUtil {
+
+ public static String doGet(String url, Map param) {
+
+ // 创建Httpclient对象
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+
+ String resultString = "";
+ CloseableHttpResponse response = null;
+ try {
+ // 创建uri
+ URIBuilder builder = new URIBuilder(url);
+ if (param != null) {
+ for (String key : param.keySet()) {
+ builder.addParameter(key, param.get(key));
+ }
+ }
+ URI uri = builder.build();
+
+ // 创建http GET请求
+ HttpGet httpGet = new HttpGet(uri);
+
+ // 执行请求
+ response = httpclient.execute(httpGet);
+ // 判断返回状态是否为200
+ if (response.getStatusLine().getStatusCode() == 200) {
+ resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return resultString;
+ }
+ public static String doGet(String url) {
+ return doGet(url, null);
+ }
+
+ public static String doPost(String url, Map param) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建参数列表
+ if (param != null) {
+ List paramList = new ArrayList<>();
+ for (String key : param.keySet()) {
+ paramList.add(new BasicNameValuePair(key, param.get(key)));
+ }
+ // 模拟表单
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList,"utf-8");
+ httpPost.setEntity(entity);
+ }
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+
+ public static String doPost(String url) {
+ return doPost(url, null);
+ }
+
+ public static String doPostJson(String json, String url) {
+ // 创建Httpclient对象
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = null;
+ String resultString = "";
+ try {
+ // 创建Http Post请求
+ HttpPost httpPost = new HttpPost(url);
+ // 创建请求内容
+ StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(entity);
+ // 执行http请求
+ response = httpClient.execute(httpPost);
+ resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return resultString;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/Server.java b/src/main/java/com/casic/util/Server.java
new file mode 100644
index 0000000..50c0957
--- /dev/null
+++ b/src/main/java/com/casic/util/Server.java
@@ -0,0 +1,38 @@
+package com.casic.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author: njb
+ * @Date: 2020/11/26 18:49
+ * @desc: 服务器socket
+ */
+public class Server {
+ public static void main(String[] args) {
+ try {
+ ServerSocket serverSocket = new ServerSocket(11321);
+ System.out.println("----------------服务端执行,開始监听请求----------------");
+
+ Socket socket = serverSocket.accept();//開始监听
+ InputStream inputStream = socket.getInputStream();
+ //获取请求内容
+ String info;
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ while ((info = bufferedReader.readLine()) != null) {
+ System.out.println("我是服务端,客户端请求为:" + info);
+ }
+ //关闭资源
+ socket.shutdownInput();
+ bufferedReader.close();
+ inputStream.close();
+ socket.close();
+ serverSocket.close();
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/casic/util/ServerSocketUtil.java b/src/main/java/com/casic/util/ServerSocketUtil.java
new file mode 100644
index 0000000..e28ec13
--- /dev/null
+++ b/src/main/java/com/casic/util/ServerSocketUtil.java
@@ -0,0 +1,41 @@
+package com.casic.util;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+@Component
+public class ServerSocketUtil {
+
+ @Value("${casic.host}")
+ private String host;
+ @Value("${casic.port}")
+ private Integer port;
+
+ public String sendMsg(String Json) throws UnknownHostException, IOException {
+
+ // 向服务器端发送请求,服务器IP地址和服务器监听的端口号
+ Socket server = new Socket(host, port);
+ if(!server.isConnected()){
+ return "11";
+ }
+
+ OutputStream os = server.getOutputStream();
+ //把输出流封装在DataOutputStream中
+ DataOutputStream dos = new DataOutputStream(os);
+ //使用writeUTF发送字符串
+ dos.writeUTF("Legendary!");
+ dos.flush();
+ dos.close();
+ server.close();
+// System.out.println("连接已建立...");
+// // 发送消息
+// printWriter.println(Json);
+// printWriter.flush();
+ return "200";
+ }
+
+}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
new file mode 100644
index 0000000..2f12836
--- /dev/null
+++ b/src/main/resources/application-dev.yml
@@ -0,0 +1,19 @@
+server:
+ port: 11316
+################### spring配置 ###################
+spring:
+ datasource:
+ driver-class-name: org.postgresql.Driver
+ url: jdbc:postgresql://111.198.10.15:11209/smartwell_zq_cs
+ username: postgres
+ password: casic203
+ session:
+ store-type: redis
+ jms:
+ pub-sub-domain: true
+logging:
+ level.root: error
+ level.com.casic: info
+ file:
+ path: logs/
+ name: missiles.log
\ No newline at end of file
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..6f17b78
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,33 @@
+##########################################################
+################## 所有profile共有的配置 #################
+##########################################################
+spring:
+ profiles:
+ active: dev
+ servlet:
+ multipart:
+ max-file-size: 50MB
+ max-request-size: 80MB
+mybatis-plus:
+ global-config:
+ enable-sql-runner: true
+ configuration:
+ # 配置结果集属性为空时 是否映射返回结果
+ log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句,调试用
+ mapper-locations: classpath:mapper/*.xml
+mybatis:
+ mapper-locations: classpath:mapper/*.xml
+#mybatis-plus:
+# sql-injector: com.baomidou.mybatisplus.mapper.LogicSqlInjector
+casic:
+ device:
+ gas-dector:
+ type: 101
+ minutes: 60
+ enable-local: false
+ host: 127.0.0.1
+ port: 11320
+ url: http://127.0.0.1:11320/test
+
+# type 可燃气体的设备类型
+# minutes: 燃气外协设备上传数据的分钟数
\ No newline at end of file