diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/service/scanner/ScannerMapCommon.java b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java new file mode 100644 index 0000000..efa3c9b --- /dev/null +++ b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java @@ -0,0 +1,16 @@ +package com.casic.service.scanner; + +import java.util.HashMap; +import java.util.Map; + +public class ScannerMapCommon { + + public static Map flowMap; + public static Map pressMap; + + static { + flowMap = new HashMap<>(); + pressMap = new HashMap<>(); + } + +} diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/service/scanner/ScannerMapCommon.java b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java new file mode 100644 index 0000000..efa3c9b --- /dev/null +++ b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java @@ -0,0 +1,16 @@ +package com.casic.service.scanner; + +import java.util.HashMap; +import java.util.Map; + +public class ScannerMapCommon { + + public static Map flowMap; + public static Map pressMap; + + static { + flowMap = new HashMap<>(); + pressMap = new HashMap<>(); + } + +} diff --git a/src/main/java/com/casic/service/scanner/TempHumiScanner.java b/src/main/java/com/casic/service/scanner/TempHumiScanner.java index 74aa34f..d2718cb 100644 --- a/src/main/java/com/casic/service/scanner/TempHumiScanner.java +++ b/src/main/java/com/casic/service/scanner/TempHumiScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class TempHumiScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String humiThreshold = alarmLevelConfig.getHumi(); String tempThreshold = alarmLevelConfig.getTemp(); List> pressDataMapList = this.deviceDataMapper.getHumiTemp(); @@ -23,12 +25,16 @@ String humidityData = pressDataMap.get("HUMIDITY").toString(); Integer isHumiAlarm = isAlarm(humidityData, humiThreshold); if (isHumiAlarm == 0 && isTempAlarm == 0) { - sendAlarmMsg(devcode, 0); + String alarmMsg = devcode + alarmMark +0; + alarmMsgDTOList.add(alarmMsg); } else { - sendAlarmMsg(devcode, 1); + String alarmMsg = devcode + alarmMark + 1; + alarmMsgDTOList.add(alarmMsg); } +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/service/scanner/ScannerMapCommon.java b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java new file mode 100644 index 0000000..efa3c9b --- /dev/null +++ b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java @@ -0,0 +1,16 @@ +package com.casic.service.scanner; + +import java.util.HashMap; +import java.util.Map; + +public class ScannerMapCommon { + + public static Map flowMap; + public static Map pressMap; + + static { + flowMap = new HashMap<>(); + pressMap = new HashMap<>(); + } + +} diff --git a/src/main/java/com/casic/service/scanner/TempHumiScanner.java b/src/main/java/com/casic/service/scanner/TempHumiScanner.java index 74aa34f..d2718cb 100644 --- a/src/main/java/com/casic/service/scanner/TempHumiScanner.java +++ b/src/main/java/com/casic/service/scanner/TempHumiScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class TempHumiScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String humiThreshold = alarmLevelConfig.getHumi(); String tempThreshold = alarmLevelConfig.getTemp(); List> pressDataMapList = this.deviceDataMapper.getHumiTemp(); @@ -23,12 +25,16 @@ String humidityData = pressDataMap.get("HUMIDITY").toString(); Integer isHumiAlarm = isAlarm(humidityData, humiThreshold); if (isHumiAlarm == 0 && isTempAlarm == 0) { - sendAlarmMsg(devcode, 0); + String alarmMsg = devcode + alarmMark +0; + alarmMsgDTOList.add(alarmMsg); } else { - sendAlarmMsg(devcode, 1); + String alarmMsg = devcode + alarmMark + 1; + alarmMsgDTOList.add(alarmMsg); } +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/util/MyThreadPool.java b/src/main/java/com/casic/util/MyThreadPool.java new file mode 100644 index 0000000..9077273 --- /dev/null +++ b/src/main/java/com/casic/util/MyThreadPool.java @@ -0,0 +1,66 @@ +package com.casic.util; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/* + 这是自定义的线程池类; + + 成员变量: + 1:任务队列 集合 需要控制线程安全问题 + 2:当前线程数量 + 3:核心线程数量 + 4:最大线程数量 + 5:任务队列的长度 + 成员方法 + 1:提交任务; + 将任务添加到集合中,需要判断是否超出了任务总长度 + 2:执行任务; + 判断当前线程的数量,决定创建核心线程还是非核心线程 + */ +public class MyThreadPool { + // 1:任务队列 集合 需要控制线程安全问题 + private List tasks = Collections.synchronizedList(new LinkedList<>()); + //2:当前线程数量 + private int num; + //3:核心线程数量 + private int corePoolSize; + //4:最大线程数量 + private int maxSize; + //5:任务队列的长度 + private int workSize; + + public MyThreadPool(int corePoolSize, int maxSize, int workSize) { + this.corePoolSize = corePoolSize; + this.maxSize = maxSize; + this.workSize = workSize; + } + + //1:提交任务; + public void submit(Runnable r){ + //判断当前集合中任务的数量,是否超出了最大任务数量 + if(tasks.size()>=workSize){ + System.out.println("任务:"+r+"被丢弃了..."); + }else { + tasks.add(r); + //执行任务 + execTask(r); + } + } + //2:执行任务; + private void execTask(Runnable r) { + //判断当前线程池中的线程总数量,是否超出了核心数, + if(num < corePoolSize){ + new MyWorker("核心线程:"+num,tasks).start(); + num++; + }else if(num < maxSize){ + new MyWorker("非核心线程:"+num,tasks).start(); + num++; + }else { + System.out.println("任务:"+r+" 被缓存了..."); + } + } + +} + diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/service/scanner/ScannerMapCommon.java b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java new file mode 100644 index 0000000..efa3c9b --- /dev/null +++ b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java @@ -0,0 +1,16 @@ +package com.casic.service.scanner; + +import java.util.HashMap; +import java.util.Map; + +public class ScannerMapCommon { + + public static Map flowMap; + public static Map pressMap; + + static { + flowMap = new HashMap<>(); + pressMap = new HashMap<>(); + } + +} diff --git a/src/main/java/com/casic/service/scanner/TempHumiScanner.java b/src/main/java/com/casic/service/scanner/TempHumiScanner.java index 74aa34f..d2718cb 100644 --- a/src/main/java/com/casic/service/scanner/TempHumiScanner.java +++ b/src/main/java/com/casic/service/scanner/TempHumiScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class TempHumiScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String humiThreshold = alarmLevelConfig.getHumi(); String tempThreshold = alarmLevelConfig.getTemp(); List> pressDataMapList = this.deviceDataMapper.getHumiTemp(); @@ -23,12 +25,16 @@ String humidityData = pressDataMap.get("HUMIDITY").toString(); Integer isHumiAlarm = isAlarm(humidityData, humiThreshold); if (isHumiAlarm == 0 && isTempAlarm == 0) { - sendAlarmMsg(devcode, 0); + String alarmMsg = devcode + alarmMark +0; + alarmMsgDTOList.add(alarmMsg); } else { - sendAlarmMsg(devcode, 1); + String alarmMsg = devcode + alarmMark + 1; + alarmMsgDTOList.add(alarmMsg); } +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/util/MyThreadPool.java b/src/main/java/com/casic/util/MyThreadPool.java new file mode 100644 index 0000000..9077273 --- /dev/null +++ b/src/main/java/com/casic/util/MyThreadPool.java @@ -0,0 +1,66 @@ +package com.casic.util; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/* + 这是自定义的线程池类; + + 成员变量: + 1:任务队列 集合 需要控制线程安全问题 + 2:当前线程数量 + 3:核心线程数量 + 4:最大线程数量 + 5:任务队列的长度 + 成员方法 + 1:提交任务; + 将任务添加到集合中,需要判断是否超出了任务总长度 + 2:执行任务; + 判断当前线程的数量,决定创建核心线程还是非核心线程 + */ +public class MyThreadPool { + // 1:任务队列 集合 需要控制线程安全问题 + private List tasks = Collections.synchronizedList(new LinkedList<>()); + //2:当前线程数量 + private int num; + //3:核心线程数量 + private int corePoolSize; + //4:最大线程数量 + private int maxSize; + //5:任务队列的长度 + private int workSize; + + public MyThreadPool(int corePoolSize, int maxSize, int workSize) { + this.corePoolSize = corePoolSize; + this.maxSize = maxSize; + this.workSize = workSize; + } + + //1:提交任务; + public void submit(Runnable r){ + //判断当前集合中任务的数量,是否超出了最大任务数量 + if(tasks.size()>=workSize){ + System.out.println("任务:"+r+"被丢弃了..."); + }else { + tasks.add(r); + //执行任务 + execTask(r); + } + } + //2:执行任务; + private void execTask(Runnable r) { + //判断当前线程池中的线程总数量,是否超出了核心数, + if(num < corePoolSize){ + new MyWorker("核心线程:"+num,tasks).start(); + num++; + }else if(num < maxSize){ + new MyWorker("非核心线程:"+num,tasks).start(); + num++; + }else { + System.out.println("任务:"+r+" 被缓存了..."); + } + } + +} + diff --git a/src/main/java/com/casic/util/MyWorker.java b/src/main/java/com/casic/util/MyWorker.java new file mode 100644 index 0000000..13b7395 --- /dev/null +++ b/src/main/java/com/casic/util/MyWorker.java @@ -0,0 +1,29 @@ +package com.casic.util; + +import java.util.List; + +/* + 需求: + 编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字; + 设计一个集合,用于保存所有的任务; + */ +public class MyWorker extends Thread{ + private String name;//保存线程的名字 + private List tasks; + //利用构造方法,给成员变量赋值 + + public MyWorker(String name, List tasks) { + super(name); + this.tasks = tasks; + } + + @Override + public void run() { + //判断集合中是否有任务,只要有,就一直执行任务 + while (tasks.size()>0){ + Runnable r = tasks.remove(0); + r.run(); + } + } +} + diff --git a/pom.xml b/pom.xml index a4d2a33..6a05fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.casic - rain_receiver + data_receiver 1.0-SNAPSHOT jar diff --git a/src/main/java/com/casic/client/Client.java b/src/main/java/com/casic/client/Client.java index 8bf0177..a54a8d9 100644 --- a/src/main/java/com/casic/client/Client.java +++ b/src/main/java/com/casic/client/Client.java @@ -6,17 +6,26 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; - +@Slf4j @Component -public class Client { +public class Client extends Thread { @Autowired private ServerPort serverPort; - public void send(String sendMsg) throws Exception { + private volatile String sendMsg; + + public void setSendMsg(String sendMsg) { + this.sendMsg = sendMsg; + } + + @Override + public void run() { + super.run(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); @@ -25,10 +34,26 @@ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush(sendMsg); + } catch (Exception ex) { + log.error("发送异常"); } finally { eventLoopGroup.shutdownGracefully(); } } +// public void send(String sendMsg) throws Exception { +// NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); +// try { +// Bootstrap bootstrap = new Bootstrap(); +// bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) +// .handler(new Clientinitializer()); +// ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort.getPort()).sync(); +// Channel channel = channelFuture.channel(); +// channel.writeAndFlush(sendMsg); +// } finally { +// eventLoopGroup.shutdownGracefully(); +// } +// } + } diff --git a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java index 05e6d40..a96447c 100644 --- a/src/main/java/com/casic/config/task/TaskSchedulingConfig.java +++ b/src/main/java/com/casic/config/task/TaskSchedulingConfig.java @@ -1,5 +1,7 @@ package com.casic.config.task; +import com.casic.client.Client; +import com.casic.config.TaskCronTimeConfig; import com.casic.service.DeviceDataScanner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +11,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -19,6 +22,10 @@ @Autowired private List deviceDataScannerList; + @Autowired + private TaskCronTimeConfig taskCronTimeConfig; + @Autowired + private Client client; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @@ -29,7 +36,7 @@ //设置日期任务 getMangerRunnable(""), //2.设置执行周期(Trigger) - triggerContext -> new CronTrigger("0 */1 * * * ?").nextExecutionTime(triggerContext) + triggerContext -> new CronTrigger(taskCronTimeConfig.getCronTime()).nextExecutionTime(triggerContext) ); } @@ -40,8 +47,23 @@ return new Runnable() { @Override public void run() { + List alarmMsgList = new ArrayList<>(); deviceDataScannerList.forEach( - deviceDataScanner -> deviceDataScanner.scanDeviceData() + deviceDataScanner -> { + alarmMsgList.addAll(deviceDataScanner.scanDeviceData()); + } + ); + alarmMsgList.forEach( + alarmMsg -> { + client.setSendMsg(alarmMsg); + Thread myThread = new Thread(client); + myThread.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } ); } }; diff --git a/src/main/java/com/casic/resolver/DatagramResolver.java b/src/main/java/com/casic/resolver/DatagramResolver.java index d00c952..b2c40ee 100644 --- a/src/main/java/com/casic/resolver/DatagramResolver.java +++ b/src/main/java/com/casic/resolver/DatagramResolver.java @@ -1,8 +1,9 @@ package com.casic.resolver; +import com.baomidou.mybatisplus.annotation.TableId; import com.casic.model.RelayStatusDTO; -public interface DatagramResolver { +public interface DatagramResolver{ RelayStatusDTO datagram(String msg); diff --git a/src/main/java/com/casic/server/ReceiverServerHandler.java b/src/main/java/com/casic/server/ReceiverServerHandler.java index bd6616c..627c518 100644 --- a/src/main/java/com/casic/server/ReceiverServerHandler.java +++ b/src/main/java/com/casic/server/ReceiverServerHandler.java @@ -15,11 +15,13 @@ import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @description: 消息处理handler @@ -31,7 +33,7 @@ @ChannelHandler.Sharable public class ReceiverServerHandler extends ChannelInboundHandlerAdapter implements RelaySwitchEnums { - final ChannelGroup channels = + volatile ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired @@ -67,9 +69,9 @@ */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); if (msg instanceof String) { - if (!String.valueOf(msg).contains("iccid")){ + if (!String.valueOf(msg).contains("iccid")) { datagramResolverList.forEach( rainFallDataResolver -> { RelayStatusDTO relayStatusDTO = rainFallDataResolver.datagram(String.valueOf(msg)); @@ -85,6 +87,9 @@ if (!ObjectUtils.isEmpty(relayStatusDTO)) { ByteBuf out = ByteBufAllocator.DEFAULT.heapBuffer(); String switchConent = preFix + relayStatusDTO.getChannelName() + "," + relayStatusDTO.getLampSwitch() + postFix; +// for (int i = 1; i < 6; i++) { +// switchConent = preFix + relayStatusDTO.getChannelName() + "," + 1 + postFix; + System.out.println(switchConent); out.writeBytes(switchConent.getBytes()); channels.forEach(channel -> { if (channel.isActive()) { @@ -95,6 +100,7 @@ } ); } +// } } @Override diff --git a/src/main/java/com/casic/service/DeviceDataScanner.java b/src/main/java/com/casic/service/DeviceDataScanner.java index 049fa71..5bacb17 100644 --- a/src/main/java/com/casic/service/DeviceDataScanner.java +++ b/src/main/java/com/casic/service/DeviceDataScanner.java @@ -1,7 +1,9 @@ package com.casic.service; +import java.util.List; + public interface DeviceDataScanner { - void scanDeviceData(); + List scanDeviceData(); } diff --git a/src/main/java/com/casic/service/DeviceDataSupport.java b/src/main/java/com/casic/service/DeviceDataSupport.java index 9ddcd7a..440c6b0 100644 --- a/src/main/java/com/casic/service/DeviceDataSupport.java +++ b/src/main/java/com/casic/service/DeviceDataSupport.java @@ -28,23 +28,11 @@ */ protected Integer isAlarm(String realData, String thresholdValue) { if (!StringUtils.isEmpty(realData) && !StringUtils.isEmpty(thresholdValue)) { - if (Float.valueOf(realData) >= Float.valueOf(thresholdValue)) { + if (Float.valueOf(realData) > Float.valueOf(thresholdValue)) { return 1; } } return 0; } - /** - * 发送报警消息 - */ - protected void sendAlarmMsg(String devcode, Integer isAlarm) { - try { - String alarmMsg = devcode + alarmMark + isAlarm; - client.send(alarmMsg); - } catch (Exception ex) { - log.error("消息发送失败,设备编号为{},异常信息{}", devcode, ex.getMessage()); - } - } - } diff --git a/src/main/java/com/casic/service/scanner/FlowDataScanner.java b/src/main/java/com/casic/service/scanner/FlowDataScanner.java index 741a1e8..0f02aa1 100644 --- a/src/main/java/com/casic/service/scanner/FlowDataScanner.java +++ b/src/main/java/com/casic/service/scanner/FlowDataScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class FlowDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList=new ArrayList<>(); String thresholdValue = alarmLevelConfig.getFlow(); List> flowDataMapList = this.deviceDataMapper.getFlowData(); flowDataMapList.forEach( @@ -19,9 +21,19 @@ String devcode = flowDataMap.get("DEVCODE").toString(); String insdata = flowDataMap.get("INSDATA").toString(); Integer isAlarm = isAlarm(insdata, thresholdValue); - sendAlarmMsg(devcode, isAlarm); + Integer mergeAlarm = 0; + ScannerMapCommon.flowMap.put(devcode, isAlarm); + if (ScannerMapCommon.pressMap.containsKey(devcode) && ScannerMapCommon.pressMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } diff --git a/src/main/java/com/casic/service/scanner/PressDataScanner.java b/src/main/java/com/casic/service/scanner/PressDataScanner.java index 9aea77b..00ddc13 100644 --- a/src/main/java/com/casic/service/scanner/PressDataScanner.java +++ b/src/main/java/com/casic/service/scanner/PressDataScanner.java @@ -1,29 +1,39 @@ package com.casic.service.scanner; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.casic.mapper.DeviceDataMapper; import com.casic.service.DeviceDataScanner; import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @Service -public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { +public class PressDataScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String thresholdValue = alarmLevelConfig.getPressure(); List> pressDataMapList = this.deviceDataMapper.getPressureData(); pressDataMapList.forEach( pressDataMap -> { String devcode = pressDataMap.get("DEVCODE").toString(); String pressdata = pressDataMap.get("PRESSDATA").toString(); - Integer isAlarm=isAlarm(pressdata,thresholdValue); - sendAlarmMsg(devcode,isAlarm); + Integer isAlarm = isAlarm(pressdata, thresholdValue); + ScannerMapCommon.pressMap.put(devcode, isAlarm); + Integer mergeAlarm = 0; + if (ScannerMapCommon.flowMap.containsKey(devcode) && ScannerMapCommon.flowMap.get(devcode) == 1) { + mergeAlarm = 1; + } else { + mergeAlarm = isAlarm; + } + String alarmMsg = devcode + alarmMark + mergeAlarm; + alarmMsgDTOList.add(alarmMsg); +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/service/scanner/ScannerMapCommon.java b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java new file mode 100644 index 0000000..efa3c9b --- /dev/null +++ b/src/main/java/com/casic/service/scanner/ScannerMapCommon.java @@ -0,0 +1,16 @@ +package com.casic.service.scanner; + +import java.util.HashMap; +import java.util.Map; + +public class ScannerMapCommon { + + public static Map flowMap; + public static Map pressMap; + + static { + flowMap = new HashMap<>(); + pressMap = new HashMap<>(); + } + +} diff --git a/src/main/java/com/casic/service/scanner/TempHumiScanner.java b/src/main/java/com/casic/service/scanner/TempHumiScanner.java index 74aa34f..d2718cb 100644 --- a/src/main/java/com/casic/service/scanner/TempHumiScanner.java +++ b/src/main/java/com/casic/service/scanner/TempHumiScanner.java @@ -4,6 +4,7 @@ import com.casic.service.DeviceDataSupport; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -11,7 +12,8 @@ public class TempHumiScanner extends DeviceDataSupport implements DeviceDataScanner { @Override - public void scanDeviceData() { + public List scanDeviceData() { + List alarmMsgDTOList = new ArrayList<>(); String humiThreshold = alarmLevelConfig.getHumi(); String tempThreshold = alarmLevelConfig.getTemp(); List> pressDataMapList = this.deviceDataMapper.getHumiTemp(); @@ -23,12 +25,16 @@ String humidityData = pressDataMap.get("HUMIDITY").toString(); Integer isHumiAlarm = isAlarm(humidityData, humiThreshold); if (isHumiAlarm == 0 && isTempAlarm == 0) { - sendAlarmMsg(devcode, 0); + String alarmMsg = devcode + alarmMark +0; + alarmMsgDTOList.add(alarmMsg); } else { - sendAlarmMsg(devcode, 1); + String alarmMsg = devcode + alarmMark + 1; + alarmMsgDTOList.add(alarmMsg); } +// sendAlarmMsg(devcode, mergeAlarm); } ); + return alarmMsgDTOList; } } diff --git a/src/main/java/com/casic/util/MyThreadPool.java b/src/main/java/com/casic/util/MyThreadPool.java new file mode 100644 index 0000000..9077273 --- /dev/null +++ b/src/main/java/com/casic/util/MyThreadPool.java @@ -0,0 +1,66 @@ +package com.casic.util; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/* + 这是自定义的线程池类; + + 成员变量: + 1:任务队列 集合 需要控制线程安全问题 + 2:当前线程数量 + 3:核心线程数量 + 4:最大线程数量 + 5:任务队列的长度 + 成员方法 + 1:提交任务; + 将任务添加到集合中,需要判断是否超出了任务总长度 + 2:执行任务; + 判断当前线程的数量,决定创建核心线程还是非核心线程 + */ +public class MyThreadPool { + // 1:任务队列 集合 需要控制线程安全问题 + private List tasks = Collections.synchronizedList(new LinkedList<>()); + //2:当前线程数量 + private int num; + //3:核心线程数量 + private int corePoolSize; + //4:最大线程数量 + private int maxSize; + //5:任务队列的长度 + private int workSize; + + public MyThreadPool(int corePoolSize, int maxSize, int workSize) { + this.corePoolSize = corePoolSize; + this.maxSize = maxSize; + this.workSize = workSize; + } + + //1:提交任务; + public void submit(Runnable r){ + //判断当前集合中任务的数量,是否超出了最大任务数量 + if(tasks.size()>=workSize){ + System.out.println("任务:"+r+"被丢弃了..."); + }else { + tasks.add(r); + //执行任务 + execTask(r); + } + } + //2:执行任务; + private void execTask(Runnable r) { + //判断当前线程池中的线程总数量,是否超出了核心数, + if(num < corePoolSize){ + new MyWorker("核心线程:"+num,tasks).start(); + num++; + }else if(num < maxSize){ + new MyWorker("非核心线程:"+num,tasks).start(); + num++; + }else { + System.out.println("任务:"+r+" 被缓存了..."); + } + } + +} + diff --git a/src/main/java/com/casic/util/MyWorker.java b/src/main/java/com/casic/util/MyWorker.java new file mode 100644 index 0000000..13b7395 --- /dev/null +++ b/src/main/java/com/casic/util/MyWorker.java @@ -0,0 +1,29 @@ +package com.casic.util; + +import java.util.List; + +/* + 需求: + 编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字; + 设计一个集合,用于保存所有的任务; + */ +public class MyWorker extends Thread{ + private String name;//保存线程的名字 + private List tasks; + //利用构造方法,给成员变量赋值 + + public MyWorker(String name, List tasks) { + super(name); + this.tasks = tasks; + } + + @Override + public void run() { + //判断集合中是否有任务,只要有,就一直执行任务 + while (tasks.size()>0){ + Runnable r = tasks.remove(0); + r.run(); + } + } +} + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d780e9a..104e4e0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,9 +11,9 @@ mybatis-plus: global-config: enable-sql-runner: true - configuration: + #configuration: # 配置结果集属性为空时 是否映射返回结果 - log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句,调试用 + # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句,调试用 mapper-locations: classpath:mapper/*.xml mybatis: mapper-locations: classpath:mapper/*.xml