diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java b/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java deleted file mode 100644 index 8d6b804..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "smartcity.getui") -public class PushProperties { - private String appId = null; - private String appKey = null; - private String masterSecret = null; - - public PushProperties() { - } -} diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java b/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java deleted file mode 100644 index 8d6b804..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "smartcity.getui") -public class PushProperties { - private String appId = null; - private String appKey = null; - private String masterSecret = null; - - public PushProperties() { - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java b/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java deleted file mode 100644 index 9c9c4b5..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -// tomcat启动无需配置 -@Configuration -public class WebSocketConfig { - /** - * 注入ServerEndpointExporter, - * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint - */ - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java b/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java deleted file mode 100644 index 8d6b804..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "smartcity.getui") -public class PushProperties { - private String appId = null; - private String appKey = null; - private String masterSecret = null; - - public PushProperties() { - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java b/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java deleted file mode 100644 index 9c9c4b5..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -// tomcat启动无需配置 -@Configuration -public class WebSocketConfig { - /** - * 注入ServerEndpointExporter, - * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint - */ - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java index 84dfdce..3117e09 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java @@ -159,7 +159,7 @@ //写入报警 //1、判断报警是否已存在 if (!alarmRecordsService.isDataAlarmByCode(devCode, Float.valueOf(alarmRuleResponseDTO.getAlarmThreshold()))) { - alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO); + alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO,DeviceTypeEnum.Methane.getName()); } break; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java b/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java deleted file mode 100644 index 8d6b804..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "smartcity.getui") -public class PushProperties { - private String appId = null; - private String appKey = null; - private String masterSecret = null; - - public PushProperties() { - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java b/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java deleted file mode 100644 index 9c9c4b5..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -// tomcat启动无需配置 -@Configuration -public class WebSocketConfig { - /** - * 注入ServerEndpointExporter, - * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint - */ - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java index 84dfdce..3117e09 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java @@ -159,7 +159,7 @@ //写入报警 //1、判断报警是否已存在 if (!alarmRecordsService.isDataAlarmByCode(devCode, Float.valueOf(alarmRuleResponseDTO.getAlarmThreshold()))) { - alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO); + alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO,DeviceTypeEnum.Methane.getName()); } break; } diff --git a/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java b/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java index a21688a..1bdc6a0 100644 --- a/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java +++ b/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java @@ -84,39 +84,17 @@ }); return ReturnUtil.success(iBusLedgerPipeService.watchDataByPipe(requestDTO)); } -// -// @ApiOperation("全生命周期记录查询") -// @PostMapping("/lifecycleRecord") -// @ResponseBody -// public ReturnDTO lifecycleRecord(@RequestBody DeviceMonitorRequestDTO requestDTO) { -// Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { -// throw new BusinessException(BusinessExceptionEnum.ID_NULL); -// }); -// return ReturnUtil.success(wellMonitorService.lifecycleRecord(requestDTO)); -// } -// -// @ApiOperation("全生命周期记录导出") -// @RequestMapping(value = "/lifecycleRecordExp", method = RequestMethod.GET) -// public void lifecycleRecordExp(@RequestBody DeviceMonitorRequestDTO requestDTO, HttpServletResponse response) { -// Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { -// throw new BusinessException(BusinessExceptionEnum.ID_NULL); -// }); -// try { -// iBaseExportService.exportExcel(response, LifeCycleRecordDTO.class, wellMonitorService.lifecycleRecord(requestDTO), ExportEnum.LIFE_CYCLE_RECORD_EXPORT.getSheetName()); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -// @ApiOperation("监控设备维护记录") -// @PostMapping("/repairLog") -// @ResponseBody -// public ReturnDTO repairLog(@RequestBody DeviceMonitorRequestDTO requestDTO) { + + @ApiOperation("监控设备维护记录(待开发)") + @PostMapping("/repairLog") + @ResponseBody + public ReturnDTO repairLog(@RequestBody MonitorDataRequestDTO requestDTO) { // Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { // throw new BusinessException(BusinessExceptionEnum.ID_NULL); // }); // return ReturnUtil.success(wellMonitorService.repairLog(requestDTO)); -// } + return ReturnUtil.success(); + } } diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml index 7651156..dee090f 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/dao/mapping/AlarmRecordsMapper.xml @@ -269,7 +269,7 @@ rd.*, FROM alarm_records rd - LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = bl.LEDGER_CODE + LEFT JOIN bus_ledger_all_view lr ON lr.ledgerCode = rd.LEDGER_CODE where rd.STATUS =1 and rd.EXCEPTION_TYPE=0 and rd.PROCESS_STATUS ]]> 3 and rd.PROCESS_STATUS 3 diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java index 0578678..1e6043a 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/IAlarmRecordsService.java @@ -58,7 +58,7 @@ boolean isDeviceAlarmByCode(String devCode,String alarmThreshold); - boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO); + boolean saveAlarms(String devCode,String gas,String upTime, List busWellDTOList,AlarmRuleResponseDTO alarmRuleResponseDTO,String typeName); boolean saveDeviceAlarms(String devCode, String logTime, List busWellDTOList, Optional alarmRuleResponseDTO); diff --git a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java index 271d07d..2c57afd 100644 --- a/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java +++ b/casic-alarm/src/main/java/com/casic/missiles/modular/alarm/service/impl/AlarmRecordsServiceImpl.java @@ -3,6 +3,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +28,8 @@ import com.casic.missiles.modular.alarm.service.IAlarmRecordsService; import com.casic.missiles.modular.system.model.Dept; import com.casic.missiles.util.CommonUtil; +import com.casic.missiles.util.PoolConfig; +import com.casic.missiles.util.WebSocket; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,7 +57,8 @@ private final RemindLogMapper remindLogMapper; private final AbstractDeptService abstractDeptService; private final AbstractDictService abstractDictService; - + private final WebSocket webSocket; + private PoolConfig threadPoolTaskExecutor; @Override public Page pageList(Page page, AlarmRecordDTO request, DataScope dataScope) { @@ -367,20 +371,20 @@ * 写入告警记录 * * @param devCode - * @param gas + * @param data * @param upTime * @param busWellDTOList * @param alarmRuleResponseDTO * @return */ @Override - public boolean saveAlarms(String devCode, String gas, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO) { + public boolean saveAlarms(String devCode, String data, String upTime, List busWellDTOList, AlarmRuleResponseDTO alarmRuleResponseDTO, String typeName) { for (BusAlarmLedgerDTO busLedgerDTO : busWellDTOList) { try { AlarmRecords alarmRecords = new AlarmRecords(devCode, busLedgerDTO.getLedgerCode(), busLedgerDTO.getTagNumber(), alarmRuleResponseDTO.getAlarmTypeId(), - alarmRuleResponseDTO.getAlarmTypeName(), gas, + alarmRuleResponseDTO.getAlarmTypeName(), data, alarmRuleResponseDTO.getAlarmLevelId(), CommonUtil.sdf4.parse(CommonUtil.DateFormat(upTime)), alarmRuleResponseDTO.getAlarmTypeName(), busLedgerDTO.getPlace(), "0"); this.save(alarmRecords); @@ -389,6 +393,19 @@ return false; } } + //向前端推送websocket报警消息 + threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { + @Override + public void run() { + JSONObject msg = new JSONObject(); + msg.put("alarmType", alarmRuleResponseDTO.getAlarmTypeName()); + msg.put("tagNumber", busWellDTOList.get(0).getTagNumber()); + msg.put("alarmTime", upTime); + msg.put("status", "未读"); + msg.put("typeName", typeName); + webSocket.sendAllMessage(msg.toJSONString()); + } + }); return true; } diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java new file mode 100644 index 0000000..530b5a1 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/PoolConfig.java @@ -0,0 +1,36 @@ +package com.casic.missiles.util; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + + +@Configuration +public class PoolConfig { + + @Bean("syncExecutorPool") + public Executor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心池大小 + taskExecutor.setCorePoolSize(2); + // 最大线程数 + taskExecutor.setMaxPoolSize(10); + // 队列程度 + taskExecutor.setQueueCapacity(100); + // 线程空闲时间 + taskExecutor.setKeepAliveSeconds(20); + // 线程前缀名称 + taskExecutor.setThreadNamePrefix("syncExecutor--"); + // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, + // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 + taskExecutor.setWaitForTasksToCompleteOnShutdown(true); + // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 + taskExecutor.setAwaitTerminationSeconds(20); + // 线程不够用时由调用的线程处理该任务 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + return taskExecutor; + } +} diff --git a/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java new file mode 100644 index 0000000..0e468b0 --- /dev/null +++ b/casic-alarm/src/main/java/com/casic/missiles/util/WebSocket.java @@ -0,0 +1,103 @@ +package com.casic.missiles.util; + +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * 此注解相当于设置访问URL + */ +@Component +@ServerEndpoint("/websocket/{userId}") +public class WebSocket { + private Session session; + + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static Map sessionPool = new HashMap(); + + @OnOpen + public void onOpen(Session session, @PathParam(value = "userId") String userId) { + this.session = session; + webSockets.add(this); + sessionPool.put(userId, session); + System.out.println(userId + "【websocket消息】有新的连接,总数为:" + webSockets.size()); + } + + @OnClose + public void onClose() { + webSockets.remove(this); + System.out.println("【websocket消息】连接断开,总数为:" + webSockets.size()); + } + + @OnMessage + public void onMessage(String message) { + System.out.println("【websocket消息】收到客户端消息:" + message); + } + + // 此为广播消息 + public void sendAllMessage(String message) { + for (WebSocket webSocket : webSockets) { + System.out.println("【websocket消息】广播消息:" + message); + try { + webSocket.session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + // 发送列表消息 + public void sendListMessage(List userIds, String message) { + System.out.println("【websocket消息】列表消息:" + message); + for (String userId : userIds) { + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + // 发送列表消息 + +// public void sendListMessage(List userIds, Object data){ +// System.out.println("【websocket消息】列表消息:"+data); +// for (String userId : userIds) { +// Session session = sessionPool.get(userId); +// if (session != null) { +// try { +//// session.getAsyncRemote().sendText(message); +// session.getAsyncRemote().sendObject(data); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// } +// } + + // 此为单点消息 + public void sendOneMessage(String userId, String message) { + System.out.println("【websocket消息】单点消息:" + message); + Session session = sessionPool.get(userId); + if (session != null) { + try { + session.getAsyncRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} diff --git a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java b/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java deleted file mode 100644 index 8d6b804..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/PushProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "smartcity.getui") -public class PushProperties { - private String appId = null; - private String appKey = null; - private String masterSecret = null; - - public PushProperties() { - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java b/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java deleted file mode 100644 index 9c9c4b5..0000000 --- a/casic-data/src/main/java/com/casic/missiles/config/WebSocketConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.casic.missiles.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -// tomcat启动无需配置 -@Configuration -public class WebSocketConfig { - /** - * 注入ServerEndpointExporter, - * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint - */ - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java index 84dfdce..3117e09 100644 --- a/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java +++ b/casic-data/src/main/java/com/casic/missiles/modular/data/service/impl/DataGasServiceImpl.java @@ -159,7 +159,7 @@ //写入报警 //1、判断报警是否已存在 if (!alarmRecordsService.isDataAlarmByCode(devCode, Float.valueOf(alarmRuleResponseDTO.getAlarmThreshold()))) { - alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO); + alarmRecordsService.saveAlarms(devCode, gas, upTime, busWellDTOList, alarmRuleResponseDTO,DeviceTypeEnum.Methane.getName()); } break; } diff --git a/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java b/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java index a21688a..1bdc6a0 100644 --- a/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java +++ b/casic-device/src/main/java/com/casic/missiles/modular/device/controller/BusPipeMonitorController.java @@ -84,39 +84,17 @@ }); return ReturnUtil.success(iBusLedgerPipeService.watchDataByPipe(requestDTO)); } -// -// @ApiOperation("全生命周期记录查询") -// @PostMapping("/lifecycleRecord") -// @ResponseBody -// public ReturnDTO lifecycleRecord(@RequestBody DeviceMonitorRequestDTO requestDTO) { -// Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { -// throw new BusinessException(BusinessExceptionEnum.ID_NULL); -// }); -// return ReturnUtil.success(wellMonitorService.lifecycleRecord(requestDTO)); -// } -// -// @ApiOperation("全生命周期记录导出") -// @RequestMapping(value = "/lifecycleRecordExp", method = RequestMethod.GET) -// public void lifecycleRecordExp(@RequestBody DeviceMonitorRequestDTO requestDTO, HttpServletResponse response) { -// Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { -// throw new BusinessException(BusinessExceptionEnum.ID_NULL); -// }); -// try { -// iBaseExportService.exportExcel(response, LifeCycleRecordDTO.class, wellMonitorService.lifecycleRecord(requestDTO), ExportEnum.LIFE_CYCLE_RECORD_EXPORT.getSheetName()); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// -// @ApiOperation("监控设备维护记录") -// @PostMapping("/repairLog") -// @ResponseBody -// public ReturnDTO repairLog(@RequestBody DeviceMonitorRequestDTO requestDTO) { + + @ApiOperation("监控设备维护记录(待开发)") + @PostMapping("/repairLog") + @ResponseBody + public ReturnDTO repairLog(@RequestBody MonitorDataRequestDTO requestDTO) { // Assert.isFalse(Objects.isNull(requestDTO.getWellId()), () -> { // throw new BusinessException(BusinessExceptionEnum.ID_NULL); // }); // return ReturnUtil.success(wellMonitorService.repairLog(requestDTO)); -// } + return ReturnUtil.success(); + } } diff --git a/casic-device/src/main/java/com/casic/missiles/modular/device/dao/mapping/BusLedgerPipeMapper.xml b/casic-device/src/main/java/com/casic/missiles/modular/device/dao/mapping/BusLedgerPipeMapper.xml index 0f09165..8e9d17c 100644 --- a/casic-device/src/main/java/com/casic/missiles/modular/device/dao/mapping/BusLedgerPipeMapper.xml +++ b/casic-device/src/main/java/com/casic/missiles/modular/device/dao/mapping/BusLedgerPipeMapper.xml @@ -114,14 +114,16 @@