diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java
index d00c952..b2c40ee 100644
--- a/src/main/java/com/casic/resolver/DatagramResolver.java
+++ b/src/main/java/com/casic/resolver/DatagramResolver.java
@@ -1,8 +1,9 @@
package com.casic.resolver;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.casic.model.RelayStatusDTO;
-public interface DatagramResolver {
+public interface DatagramResolver{
RelayStatusDTO datagram(String msg);
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java
index d00c952..b2c40ee 100644
--- a/src/main/java/com/casic/resolver/DatagramResolver.java
+++ b/src/main/java/com/casic/resolver/DatagramResolver.java
@@ -1,8 +1,9 @@
package com.casic.resolver;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.casic.model.RelayStatusDTO;
-public interface DatagramResolver {
+public interface DatagramResolver{
RelayStatusDTO datagram(String msg);
diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java
index bd6616c..627c518 100644
--- a/src/main/java/com/casic/server/ReceiverServerHandler.java
+++ b/src/main/java/com/casic/server/ReceiverServerHandler.java
@@ -15,11 +15,13 @@
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* @description: 消息处理handler
@@ -31,7 +33,7 @@
@ChannelHandler.Sharable
public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums {
- final ChannelGroup channels =
+ volatile ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
@@ -67,9 +69,9 @@
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
if (msg instanceof String) {
- if (!String.valueOf(msg).contains("iccid")){
+ if (!String.valueOf(msg).contains("iccid")) {
datagramResolverList.forEach(
rainFallDataResolver -> {
RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg));
@@ -85,6 +87,9 @@
if (!ObjectUtils.isEmpty(relayStatusDTO)) {
ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer();
String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix;
+// for (int i = 1; i < 6; i++) {
+// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix;
+ System.out.println(switchConent);
out.writeBytes(switchConent.getBytes());
channels.forEach(channel -> {
if (channel.isActive()) {
@@ -95,6 +100,7 @@
}
);
}
+// }
}
@Override
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java
index d00c952..b2c40ee 100644
--- a/src/main/java/com/casic/resolver/DatagramResolver.java
+++ b/src/main/java/com/casic/resolver/DatagramResolver.java
@@ -1,8 +1,9 @@
package com.casic.resolver;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.casic.model.RelayStatusDTO;
-public interface DatagramResolver {
+public interface DatagramResolver{
RelayStatusDTO datagram(String msg);
diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java
index bd6616c..627c518 100644
--- a/src/main/java/com/casic/server/ReceiverServerHandler.java
+++ b/src/main/java/com/casic/server/ReceiverServerHandler.java
@@ -15,11 +15,13 @@
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* @description: 消息处理handler
@@ -31,7 +33,7 @@
@ChannelHandler.Sharable
public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums {
- final ChannelGroup channels =
+ volatile ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
@@ -67,9 +69,9 @@
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
if (msg instanceof String) {
- if (!String.valueOf(msg).contains("iccid")){
+ if (!String.valueOf(msg).contains("iccid")) {
datagramResolverList.forEach(
rainFallDataResolver -> {
RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg));
@@ -85,6 +87,9 @@
if (!ObjectUtils.isEmpty(relayStatusDTO)) {
ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer();
String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix;
+// for (int i = 1; i < 6; i++) {
+// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix;
+ System.out.println(switchConent);
out.writeBytes(switchConent.getBytes());
channels.forEach(channel -> {
if (channel.isActive()) {
@@ -95,6 +100,7 @@
}
);
}
+// }
}
@Override
diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java
index 049fa71..5bacb17 100644
--- a/src/main/java/com/casic/service/DeviceDataScanner.java
+++ b/src/main/java/com/casic/service/DeviceDataScanner.java
@@ -1,7 +1,9 @@
package com.casic.service;
+import java.util.List;
+
public interface DeviceDataScanner {
- void scanDeviceData();
+ List scanDeviceData();
}
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java
index d00c952..b2c40ee 100644
--- a/src/main/java/com/casic/resolver/DatagramResolver.java
+++ b/src/main/java/com/casic/resolver/DatagramResolver.java
@@ -1,8 +1,9 @@
package com.casic.resolver;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.casic.model.RelayStatusDTO;
-public interface DatagramResolver {
+public interface DatagramResolver{
RelayStatusDTO datagram(String msg);
diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java
index bd6616c..627c518 100644
--- a/src/main/java/com/casic/server/ReceiverServerHandler.java
+++ b/src/main/java/com/casic/server/ReceiverServerHandler.java
@@ -15,11 +15,13 @@
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* @description: 消息处理handler
@@ -31,7 +33,7 @@
@ChannelHandler.Sharable
public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums {
- final ChannelGroup channels =
+ volatile ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
@@ -67,9 +69,9 @@
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
if (msg instanceof String) {
- if (!String.valueOf(msg).contains("iccid")){
+ if (!String.valueOf(msg).contains("iccid")) {
datagramResolverList.forEach(
rainFallDataResolver -> {
RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg));
@@ -85,6 +87,9 @@
if (!ObjectUtils.isEmpty(relayStatusDTO)) {
ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer();
String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix;
+// for (int i = 1; i < 6; i++) {
+// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix;
+ System.out.println(switchConent);
out.writeBytes(switchConent.getBytes());
channels.forEach(channel -> {
if (channel.isActive()) {
@@ -95,6 +100,7 @@
}
);
}
+// }
}
@Override
diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java
index 049fa71..5bacb17 100644
--- a/src/main/java/com/casic/service/DeviceDataScanner.java
+++ b/src/main/java/com/casic/service/DeviceDataScanner.java
@@ -1,7 +1,9 @@
package com.casic.service;
+import java.util.List;
+
public interface DeviceDataScanner {
- void scanDeviceData();
+ List scanDeviceData();
}
diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java
index 9ddcd7a..440c6b0 100644
--- a/src/main/java/com/casic/service/DeviceDataSupport.java
+++ b/src/main/java/com/casic/service/DeviceDataSupport.java
@@ -28,23 +28,11 @@
*/
protected Integer isAlarm(String realData, String thresholdValue) {
if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) {
- if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) {
+ if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) {
return 1;
}
}
return 0;
}
- /**
- * 发送报警消息
- */
- protected void sendAlarmMsg(String devcode, Integer isAlarm) {
- try {
- String alarmMsg = devcode + alarmMark + isAlarm;
- client.send(alarmMsg);
- } catch (Exception ex) {
- log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage());
- }
- }
-
}
diff --git a/pom.xml b/pom.xml
index a4d2a33..6a05fdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.casic
- rain_receiver
+ data_receiver
1.0-SNAPSHOT
jar
diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java
index 8bf0177..a54a8d9 100644
--- a/src/main/java/com/casic/client/Client.java
+++ b/src/main/java/com/casic/client/Client.java
@@ -6,17 +6,26 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-
+@Slf4j
@Component
-public class Client {
+public class Client extends Thread {
@Autowired
private ServerPort serverPort;
- public void send(String sendMsg) throws Exception {
+ private volatile String sendMsg;
+
+ public void setSendMsg(String sendMsg) {
+ this.sendMsg = sendMsg;
+ }
+
+ @Override
+ public void run() {
+ super.run();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
@@ -25,10 +34,26 @@
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush(sendMsg);
+ } catch (Exception ex) {
+ log.error("发送异常");
} finally {
eventLoopGroup.shutdownGracefully();
}
}
+// public void send(String sendMsg) throws Exception {
+// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+// try {
+// Bootstrap bootstrap = new Bootstrap();
+// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+// .handler(new Clientinitializer());
+// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync();
+// Channel channel = channelFuture.channel();
+// channel.writeAndFlush(sendMsg);
+// } finally {
+// eventLoopGroup.shutdownGracefully();
+// }
+// }
+
}
diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
index 05e6d40..a96447c 100644
--- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
+++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java
@@ -1,5 +1,7 @@
package com.casic.config.task;
+import com.casic.client.Client;
+import com.casic.config.TaskCronTimeConfig;
import com.casic.service.DeviceDataScanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -19,6 +22,10 @@
@Autowired
private List deviceDataScannerList;
+ @Autowired
+ private TaskCronTimeConfig taskCronTimeConfig;
+ @Autowired
+ private Client client;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
@@ -29,7 +36,7 @@
//设置日期任务
getMangerRunnable(""),
//2.设置执行周期(Trigger)
- triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext)
+ triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext)
);
}
@@ -40,8 +47,23 @@
return new Runnable() {
@Override
public void run() {
+ List alarmMsgList = new ArrayList<>();
deviceDataScannerList.forEach(
- deviceDataScanner -> deviceDataScanner.scanDeviceData()
+ deviceDataScanner -> {
+ alarmMsgList.addAll(deviceDataScanner.scanDeviceData());
+ }
+ );
+ alarmMsgList.forEach(
+ alarmMsg -> {
+ client.setSendMsg(alarmMsg);
+ Thread myThread = new Thread(client);
+ myThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
);
}
};
diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java
index d00c952..b2c40ee 100644
--- a/src/main/java/com/casic/resolver/DatagramResolver.java
+++ b/src/main/java/com/casic/resolver/DatagramResolver.java
@@ -1,8 +1,9 @@
package com.casic.resolver;
+import com.baomidou.mybatisplus.annotation.TableId;
import com.casic.model.RelayStatusDTO;
-public interface DatagramResolver {
+public interface DatagramResolver{
RelayStatusDTO datagram(String msg);
diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java
index bd6616c..627c518 100644
--- a/src/main/java/com/casic/server/ReceiverServerHandler.java
+++ b/src/main/java/com/casic/server/ReceiverServerHandler.java
@@ -15,11 +15,13 @@
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* @description: 消息处理handler
@@ -31,7 +33,7 @@
@ChannelHandler.Sharable
public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums {
- final ChannelGroup channels =
+ volatile ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
@@ -67,9 +69,9 @@
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
if (msg instanceof String) {
- if (!String.valueOf(msg).contains("iccid")){
+ if (!String.valueOf(msg).contains("iccid")) {
datagramResolverList.forEach(
rainFallDataResolver -> {
RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg));
@@ -85,6 +87,9 @@
if (!ObjectUtils.isEmpty(relayStatusDTO)) {
ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer();
String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix;
+// for (int i = 1; i < 6; i++) {
+// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix;
+ System.out.println(switchConent);
out.writeBytes(switchConent.getBytes());
channels.forEach(channel -> {
if (channel.isActive()) {
@@ -95,6 +100,7 @@
}
);
}
+// }
}
@Override
diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java
index 049fa71..5bacb17 100644
--- a/src/main/java/com/casic/service/DeviceDataScanner.java
+++ b/src/main/java/com/casic/service/DeviceDataScanner.java
@@ -1,7 +1,9 @@
package com.casic.service;
+import java.util.List;
+
public interface DeviceDataScanner {
- void scanDeviceData();
+ List scanDeviceData();
}
diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java
index 9ddcd7a..440c6b0 100644
--- a/src/main/java/com/casic/service/DeviceDataSupport.java
+++ b/src/main/java/com/casic/service/DeviceDataSupport.java
@@ -28,23 +28,11 @@
*/
protected Integer isAlarm(String realData, String thresholdValue) {
if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) {
- if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) {
+ if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) {
return 1;
}
}
return 0;
}
- /**
- * 发送报警消息
- */
- protected void sendAlarmMsg(String devcode, Integer isAlarm) {
- try {
- String alarmMsg = devcode + alarmMark + isAlarm;
- client.send(alarmMsg);
- } catch (Exception ex) {
- log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage());
- }
- }
-
}
diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java
index 741a1e8..0f02aa1 100644
--- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java
+++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java
@@ -4,6 +4,7 @@
import com.casic.service.DeviceDataSupport;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -11,7 +12,8 @@
public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner {
@Override
- public void scanDeviceData() {
+ public List scanDeviceData() {
+ List alarmMsgDTOList=new ArrayList<>();
String thresholdValue = alarmLevelConfig.getFlow();
List