diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java new file mode 100644 index 0000000..d0be9b1 --- /dev/null +++ b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java @@ -0,0 +1,13 @@ +package com.casic.missiles.es; + +public class EsDataFactory { + + private static DataGasEs dataGasEs; + + public static synchronized DataGasEs getDataGasEs(){ + if(dataGasEs==null){ + dataGasEs = new DataGasEs(); + } + return dataGasEs; + } +} diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java new file mode 100644 index 0000000..d0be9b1 --- /dev/null +++ b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java @@ -0,0 +1,13 @@ +package com.casic.missiles.es; + +public class EsDataFactory { + + private static DataGasEs dataGasEs; + + public static synchronized DataGasEs getDataGasEs(){ + if(dataGasEs==null){ + dataGasEs = new DataGasEs(); + } + return dataGasEs; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java index f4622ff..82ea79a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java @@ -7,6 +7,7 @@ import com.casic.missiles.enums.BusinessExceptionEnum; import com.casic.missiles.enums.StatusEnum; import com.casic.missiles.es.DataGasEs; +import com.casic.missiles.es.EsDataFactory; import com.casic.missiles.exception.BusinessException; import com.casic.missiles.modular.system.dto.*; import com.casic.missiles.modular.system.dto.monitor.MonitorBaseInfo; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -324,7 +326,6 @@ } - //设置预置位 private boolean setPreset(Long serialHandle, int index) { int sum = (0x01 + 0x00 + 0x03 + index / 256 + index % 256) % 0x100; @@ -597,8 +598,8 @@ public boolean control(Integer userId, String command, Integer isStop) { logger.info("device control: userId = " + userId + " command = " + command + " isStop =" + isStop); Long serialHandle = serialHandle(Long.valueOf(userId)); - if(isStop==1){ - if("clean".equals(command)){ + if (isStop == 1) { + if ("clean".equals(command)) { return sendStopCleanCommand(serialHandle); } return sendStopCommand(serialHandle); @@ -1256,13 +1257,12 @@ if (dwBufSize > 10) { // 甲烷数据 handleGasData(deviceInfo, String.valueOf(chars)); - } else { - int flag = ires[3]; } } } - private void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { + @Async + public void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat(" get vertical angle-->").concat(String.valueOf(pitch)).concat("+ get horizontal angle-->").concat(String.valueOf(direction)).concat("+ get gas data-->").concat(String.valueOf(gas))); JSONObject msg = new JSONObject(); msg.put("deviceIp", deviceInfo.getDeviceIp()); @@ -1294,7 +1294,6 @@ if (!"005".equals(data[1])) { return; } - System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat("获得甲烷数据:").concat(gasData)); if (data.length < 8) { return; } @@ -1306,31 +1305,23 @@ pitch = pitch > 180 ? (360 - pitch) : (0 - pitch); direction = new BigDecimal(direction).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); pitch = new BigDecimal(pitch).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); - try { - handlegasDataFromGasData(deviceInfo, concentration, direction, pitch); - } catch (Exception e) { - e.printStackTrace(); - } final double direction1 = direction; final double pitch1 = pitch; - threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { - @Override - public void run() { - MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); - if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { - try { - //自带巡检设备 - if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); - insertGasAlarmProduct(monitorBaseInfo, dataGasEs, alarmThreshold, 1 == alarmFlag ? true : false); - } else { - AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - insertGasAlarm(monitorBaseInfo, dataGasEs, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - } - } catch (Exception e) { - logger.error(e.getMessage()); - } + threadPoolTaskExecutor.taskExecutor().execute(() -> { + MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); + try { + handlegasDataFromGasData(deviceInfo, concentration, direction1, pitch1); + } catch (Exception e) { + e.printStackTrace(); + } + if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { + if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); + insertGasAlarmProduct(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold, 1 == alarmFlag ? true : false); + } else { + AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); + insertGasAlarm(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); } } }); @@ -1351,38 +1342,40 @@ /** * 甲烷数据保存es */ - private DataGasEs insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, - double pitch, Double threshold) throws IOException { - DataGasEs dataGasEs = new DataGasEs(); - dataGasEs.setStationId(monitorBaseInfo.getStationId()); - dataGasEs.setStationName(monitorBaseInfo.getStationName()); - dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); - dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); - dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); - dataGasEs.setDirection(direction); - dataGasEs.setPitch(pitch); - dataGasEs.setConcentration(concentration); - dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); - dataGasEs.setThreshold(threshold); - dataGasService.insertDataGasEs(dataGasEs); - return dataGasEs; + public void insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, + double pitch, Double threshold) { + + try { + DataGasEs dataGasEs = EsDataFactory.getDataGasEs(); + dataGasEs.setStationId(monitorBaseInfo.getStationId()); + dataGasEs.setStationName(monitorBaseInfo.getStationName()); + dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); + dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); + dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); + dataGasEs.setDirection(direction); + dataGasEs.setPitch(pitch); + dataGasEs.setConcentration(concentration); + dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); + dataGasEs.setThreshold(threshold); + dataGasService.insertDataGasEs(dataGasEs); + } catch (Exception e) { + e.printStackTrace(); + } } /** * 生成报警(自动消警) */ - private void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold, - boolean alarmFlag) { + public void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold, + boolean alarmFlag) { // 判断报警、消警 if (alarmFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), - Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); @@ -1391,7 +1384,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1399,7 +1392,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } else { //更新和推送设备在线 @@ -1407,13 +1400,13 @@ this.sendDeviceStatusData(monitorBaseInfo.getDeviceIp(), "onLine"); } // 自动消警(同一角度) - if (alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())) > 0) { + if (alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch) > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } } } @@ -1421,16 +1414,16 @@ /** * 生成报警(自动消警) */ - private void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold) { + public void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold) { // 判断报警、消警 - if (threshold != null && dataGas.getConcentration() > threshold) { + if (threshold != null && concentration > threshold) { boolean resumeFlag = false; // 报警,注意消警时限 QueryWrapper query = new QueryWrapper<>(); - query.eq("MONITOR_ID", dataGas.getMonitorId()); + query.eq("MONITOR_ID", monitorBaseInfo.getMonitorId()); query.eq("ALARM_STATUS", "1"); - query.eq("ALARM_DIRECTION", dataGas.getDirection()); - query.eq("ALARM_PITCH", dataGas.getPitch()); + query.eq("ALARM_DIRECTION", direction); + query.eq("ALARM_PITCH", pitch); query.isNotNull("RESUME_TIME"); List resumeList = alarmRecordService.list(query); for (AlarmRecord alarmRecord : resumeList) { @@ -1444,12 +1437,12 @@ if (!resumeFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, + pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); //更新设备状态为报警 @@ -1457,7 +1450,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1465,7 +1458,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); //增加门禁报警控制,只针对天讯通设备 if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { @@ -1478,14 +1471,14 @@ } } else { // 自动消警(同一角度) - int count = alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())); + int count = alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); if (count > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java new file mode 100644 index 0000000..d0be9b1 --- /dev/null +++ b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java @@ -0,0 +1,13 @@ +package com.casic.missiles.es; + +public class EsDataFactory { + + private static DataGasEs dataGasEs; + + public static synchronized DataGasEs getDataGasEs(){ + if(dataGasEs==null){ + dataGasEs = new DataGasEs(); + } + return dataGasEs; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java index f4622ff..82ea79a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java @@ -7,6 +7,7 @@ import com.casic.missiles.enums.BusinessExceptionEnum; import com.casic.missiles.enums.StatusEnum; import com.casic.missiles.es.DataGasEs; +import com.casic.missiles.es.EsDataFactory; import com.casic.missiles.exception.BusinessException; import com.casic.missiles.modular.system.dto.*; import com.casic.missiles.modular.system.dto.monitor.MonitorBaseInfo; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -324,7 +326,6 @@ } - //设置预置位 private boolean setPreset(Long serialHandle, int index) { int sum = (0x01 + 0x00 + 0x03 + index / 256 + index % 256) % 0x100; @@ -597,8 +598,8 @@ public boolean control(Integer userId, String command, Integer isStop) { logger.info("device control: userId = " + userId + " command = " + command + " isStop =" + isStop); Long serialHandle = serialHandle(Long.valueOf(userId)); - if(isStop==1){ - if("clean".equals(command)){ + if (isStop == 1) { + if ("clean".equals(command)) { return sendStopCleanCommand(serialHandle); } return sendStopCommand(serialHandle); @@ -1256,13 +1257,12 @@ if (dwBufSize > 10) { // 甲烷数据 handleGasData(deviceInfo, String.valueOf(chars)); - } else { - int flag = ires[3]; } } } - private void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { + @Async + public void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat(" get vertical angle-->").concat(String.valueOf(pitch)).concat("+ get horizontal angle-->").concat(String.valueOf(direction)).concat("+ get gas data-->").concat(String.valueOf(gas))); JSONObject msg = new JSONObject(); msg.put("deviceIp", deviceInfo.getDeviceIp()); @@ -1294,7 +1294,6 @@ if (!"005".equals(data[1])) { return; } - System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat("获得甲烷数据:").concat(gasData)); if (data.length < 8) { return; } @@ -1306,31 +1305,23 @@ pitch = pitch > 180 ? (360 - pitch) : (0 - pitch); direction = new BigDecimal(direction).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); pitch = new BigDecimal(pitch).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); - try { - handlegasDataFromGasData(deviceInfo, concentration, direction, pitch); - } catch (Exception e) { - e.printStackTrace(); - } final double direction1 = direction; final double pitch1 = pitch; - threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { - @Override - public void run() { - MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); - if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { - try { - //自带巡检设备 - if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); - insertGasAlarmProduct(monitorBaseInfo, dataGasEs, alarmThreshold, 1 == alarmFlag ? true : false); - } else { - AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - insertGasAlarm(monitorBaseInfo, dataGasEs, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - } - } catch (Exception e) { - logger.error(e.getMessage()); - } + threadPoolTaskExecutor.taskExecutor().execute(() -> { + MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); + try { + handlegasDataFromGasData(deviceInfo, concentration, direction1, pitch1); + } catch (Exception e) { + e.printStackTrace(); + } + if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { + if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); + insertGasAlarmProduct(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold, 1 == alarmFlag ? true : false); + } else { + AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); + insertGasAlarm(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); } } }); @@ -1351,38 +1342,40 @@ /** * 甲烷数据保存es */ - private DataGasEs insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, - double pitch, Double threshold) throws IOException { - DataGasEs dataGasEs = new DataGasEs(); - dataGasEs.setStationId(monitorBaseInfo.getStationId()); - dataGasEs.setStationName(monitorBaseInfo.getStationName()); - dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); - dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); - dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); - dataGasEs.setDirection(direction); - dataGasEs.setPitch(pitch); - dataGasEs.setConcentration(concentration); - dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); - dataGasEs.setThreshold(threshold); - dataGasService.insertDataGasEs(dataGasEs); - return dataGasEs; + public void insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, + double pitch, Double threshold) { + + try { + DataGasEs dataGasEs = EsDataFactory.getDataGasEs(); + dataGasEs.setStationId(monitorBaseInfo.getStationId()); + dataGasEs.setStationName(monitorBaseInfo.getStationName()); + dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); + dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); + dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); + dataGasEs.setDirection(direction); + dataGasEs.setPitch(pitch); + dataGasEs.setConcentration(concentration); + dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); + dataGasEs.setThreshold(threshold); + dataGasService.insertDataGasEs(dataGasEs); + } catch (Exception e) { + e.printStackTrace(); + } } /** * 生成报警(自动消警) */ - private void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold, - boolean alarmFlag) { + public void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold, + boolean alarmFlag) { // 判断报警、消警 if (alarmFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), - Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); @@ -1391,7 +1384,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1399,7 +1392,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } else { //更新和推送设备在线 @@ -1407,13 +1400,13 @@ this.sendDeviceStatusData(monitorBaseInfo.getDeviceIp(), "onLine"); } // 自动消警(同一角度) - if (alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())) > 0) { + if (alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch) > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } } } @@ -1421,16 +1414,16 @@ /** * 生成报警(自动消警) */ - private void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold) { + public void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold) { // 判断报警、消警 - if (threshold != null && dataGas.getConcentration() > threshold) { + if (threshold != null && concentration > threshold) { boolean resumeFlag = false; // 报警,注意消警时限 QueryWrapper query = new QueryWrapper<>(); - query.eq("MONITOR_ID", dataGas.getMonitorId()); + query.eq("MONITOR_ID", monitorBaseInfo.getMonitorId()); query.eq("ALARM_STATUS", "1"); - query.eq("ALARM_DIRECTION", dataGas.getDirection()); - query.eq("ALARM_PITCH", dataGas.getPitch()); + query.eq("ALARM_DIRECTION", direction); + query.eq("ALARM_PITCH", pitch); query.isNotNull("RESUME_TIME"); List resumeList = alarmRecordService.list(query); for (AlarmRecord alarmRecord : resumeList) { @@ -1444,12 +1437,12 @@ if (!resumeFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, + pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); //更新设备状态为报警 @@ -1457,7 +1450,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1465,7 +1458,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); //增加门禁报警控制,只针对天讯通设备 if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { @@ -1478,14 +1471,14 @@ } } else { // 自动消警(同一角度) - int count = alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())); + int count = alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); if (count > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java index 4037b1b..f201d01 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java @@ -9,6 +9,7 @@ import com.casic.missiles.modular.system.service.IBusMonipoiInfoService; import com.casic.missiles.modular.system.service.IHCNetService; import com.casic.missiles.modular.system.service.impl.HCNetServiceImpl; +import com.casic.missiles.modular.system.util.PoolConfig; import com.sun.jna.Memory; import com.sun.jna.Pointer; import org.slf4j.Logger; @@ -48,6 +49,9 @@ @Value("${casic.isOpen}") private int isOpen; + @Autowired + private PoolConfig threadPoolTaskExecutor; + @Override public void run(ApplicationArguments args) throws Exception { // 查询全部设备 @@ -66,6 +70,7 @@ if (ObjectUtil.isNotEmpty(monipoiInfo.getDeviceIp())) { Runnable worker = this.new GasDataRunnable(monipoiInfo.getDeviceIp(), monipoiInfo.getDeviceUser(), monipoiInfo.getDevicePassword()); executor.execute(worker); +// threadPoolTaskExecutor.taskExecutor().execute(worker); } } } diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java new file mode 100644 index 0000000..d0be9b1 --- /dev/null +++ b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java @@ -0,0 +1,13 @@ +package com.casic.missiles.es; + +public class EsDataFactory { + + private static DataGasEs dataGasEs; + + public static synchronized DataGasEs getDataGasEs(){ + if(dataGasEs==null){ + dataGasEs = new DataGasEs(); + } + return dataGasEs; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java index f4622ff..82ea79a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java @@ -7,6 +7,7 @@ import com.casic.missiles.enums.BusinessExceptionEnum; import com.casic.missiles.enums.StatusEnum; import com.casic.missiles.es.DataGasEs; +import com.casic.missiles.es.EsDataFactory; import com.casic.missiles.exception.BusinessException; import com.casic.missiles.modular.system.dto.*; import com.casic.missiles.modular.system.dto.monitor.MonitorBaseInfo; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -324,7 +326,6 @@ } - //设置预置位 private boolean setPreset(Long serialHandle, int index) { int sum = (0x01 + 0x00 + 0x03 + index / 256 + index % 256) % 0x100; @@ -597,8 +598,8 @@ public boolean control(Integer userId, String command, Integer isStop) { logger.info("device control: userId = " + userId + " command = " + command + " isStop =" + isStop); Long serialHandle = serialHandle(Long.valueOf(userId)); - if(isStop==1){ - if("clean".equals(command)){ + if (isStop == 1) { + if ("clean".equals(command)) { return sendStopCleanCommand(serialHandle); } return sendStopCommand(serialHandle); @@ -1256,13 +1257,12 @@ if (dwBufSize > 10) { // 甲烷数据 handleGasData(deviceInfo, String.valueOf(chars)); - } else { - int flag = ires[3]; } } } - private void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { + @Async + public void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat(" get vertical angle-->").concat(String.valueOf(pitch)).concat("+ get horizontal angle-->").concat(String.valueOf(direction)).concat("+ get gas data-->").concat(String.valueOf(gas))); JSONObject msg = new JSONObject(); msg.put("deviceIp", deviceInfo.getDeviceIp()); @@ -1294,7 +1294,6 @@ if (!"005".equals(data[1])) { return; } - System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat("获得甲烷数据:").concat(gasData)); if (data.length < 8) { return; } @@ -1306,31 +1305,23 @@ pitch = pitch > 180 ? (360 - pitch) : (0 - pitch); direction = new BigDecimal(direction).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); pitch = new BigDecimal(pitch).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); - try { - handlegasDataFromGasData(deviceInfo, concentration, direction, pitch); - } catch (Exception e) { - e.printStackTrace(); - } final double direction1 = direction; final double pitch1 = pitch; - threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { - @Override - public void run() { - MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); - if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { - try { - //自带巡检设备 - if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); - insertGasAlarmProduct(monitorBaseInfo, dataGasEs, alarmThreshold, 1 == alarmFlag ? true : false); - } else { - AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - insertGasAlarm(monitorBaseInfo, dataGasEs, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - } - } catch (Exception e) { - logger.error(e.getMessage()); - } + threadPoolTaskExecutor.taskExecutor().execute(() -> { + MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); + try { + handlegasDataFromGasData(deviceInfo, concentration, direction1, pitch1); + } catch (Exception e) { + e.printStackTrace(); + } + if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { + if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); + insertGasAlarmProduct(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold, 1 == alarmFlag ? true : false); + } else { + AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); + insertGasAlarm(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); } } }); @@ -1351,38 +1342,40 @@ /** * 甲烷数据保存es */ - private DataGasEs insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, - double pitch, Double threshold) throws IOException { - DataGasEs dataGasEs = new DataGasEs(); - dataGasEs.setStationId(monitorBaseInfo.getStationId()); - dataGasEs.setStationName(monitorBaseInfo.getStationName()); - dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); - dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); - dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); - dataGasEs.setDirection(direction); - dataGasEs.setPitch(pitch); - dataGasEs.setConcentration(concentration); - dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); - dataGasEs.setThreshold(threshold); - dataGasService.insertDataGasEs(dataGasEs); - return dataGasEs; + public void insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, + double pitch, Double threshold) { + + try { + DataGasEs dataGasEs = EsDataFactory.getDataGasEs(); + dataGasEs.setStationId(monitorBaseInfo.getStationId()); + dataGasEs.setStationName(monitorBaseInfo.getStationName()); + dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); + dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); + dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); + dataGasEs.setDirection(direction); + dataGasEs.setPitch(pitch); + dataGasEs.setConcentration(concentration); + dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); + dataGasEs.setThreshold(threshold); + dataGasService.insertDataGasEs(dataGasEs); + } catch (Exception e) { + e.printStackTrace(); + } } /** * 生成报警(自动消警) */ - private void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold, - boolean alarmFlag) { + public void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold, + boolean alarmFlag) { // 判断报警、消警 if (alarmFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), - Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); @@ -1391,7 +1384,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1399,7 +1392,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } else { //更新和推送设备在线 @@ -1407,13 +1400,13 @@ this.sendDeviceStatusData(monitorBaseInfo.getDeviceIp(), "onLine"); } // 自动消警(同一角度) - if (alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())) > 0) { + if (alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch) > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } } } @@ -1421,16 +1414,16 @@ /** * 生成报警(自动消警) */ - private void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold) { + public void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold) { // 判断报警、消警 - if (threshold != null && dataGas.getConcentration() > threshold) { + if (threshold != null && concentration > threshold) { boolean resumeFlag = false; // 报警,注意消警时限 QueryWrapper query = new QueryWrapper<>(); - query.eq("MONITOR_ID", dataGas.getMonitorId()); + query.eq("MONITOR_ID", monitorBaseInfo.getMonitorId()); query.eq("ALARM_STATUS", "1"); - query.eq("ALARM_DIRECTION", dataGas.getDirection()); - query.eq("ALARM_PITCH", dataGas.getPitch()); + query.eq("ALARM_DIRECTION", direction); + query.eq("ALARM_PITCH", pitch); query.isNotNull("RESUME_TIME"); List resumeList = alarmRecordService.list(query); for (AlarmRecord alarmRecord : resumeList) { @@ -1444,12 +1437,12 @@ if (!resumeFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, + pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); //更新设备状态为报警 @@ -1457,7 +1450,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1465,7 +1458,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); //增加门禁报警控制,只针对天讯通设备 if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { @@ -1478,14 +1471,14 @@ } } else { // 自动消警(同一角度) - int count = alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())); + int count = alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); if (count > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java index 4037b1b..f201d01 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java @@ -9,6 +9,7 @@ import com.casic.missiles.modular.system.service.IBusMonipoiInfoService; import com.casic.missiles.modular.system.service.IHCNetService; import com.casic.missiles.modular.system.service.impl.HCNetServiceImpl; +import com.casic.missiles.modular.system.util.PoolConfig; import com.sun.jna.Memory; import com.sun.jna.Pointer; import org.slf4j.Logger; @@ -48,6 +49,9 @@ @Value("${casic.isOpen}") private int isOpen; + @Autowired + private PoolConfig threadPoolTaskExecutor; + @Override public void run(ApplicationArguments args) throws Exception { // 查询全部设备 @@ -66,6 +70,7 @@ if (ObjectUtil.isNotEmpty(monipoiInfo.getDeviceIp())) { Runnable worker = this.new GasDataRunnable(monipoiInfo.getDeviceIp(), monipoiInfo.getDeviceUser(), monipoiInfo.getDevicePassword()); executor.execute(worker); +// threadPoolTaskExecutor.taskExecutor().execute(worker); } } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java index 638095e..8f195bd 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java @@ -49,15 +49,16 @@ taskExecutor.setMaxPoolSize(32); // 队列程度 taskExecutor.setQueueCapacity(800); +// taskExecutor.setAllowCoreThreadTimeOut(true); // 线程空闲时间 - taskExecutor.setKeepAliveSeconds(60); + taskExecutor.setKeepAliveSeconds(30); // 线程前缀名称 taskExecutor.setThreadNamePrefix("syncExecutor--"); // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 taskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 - taskExecutor.setAwaitTerminationSeconds(60); + taskExecutor.setAwaitTerminationSeconds(30); // 线程不够用时由调用的线程处理该任务 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return taskExecutor; diff --git a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java index cf1ac24..3bd8f39 100644 --- a/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java +++ b/casic-common/src/main/java/com/casic/missiles/es/ElasticSearchUtil.java @@ -76,7 +76,7 @@ endPoint.concat(id); } endPoint.concat("?refresh=true"); - logger.debug(entity.toString()); +// logger.debug(entity.toString()); return performRequest(method, endPoint, JSON.toJSONString(entity)); } @@ -205,7 +205,7 @@ request.setOptions(builder.build()); } Response response = client.performRequest(request); - logger.debug(response.toString()); +// logger.debug(response.toString()); return response; } diff --git a/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java new file mode 100644 index 0000000..d0be9b1 --- /dev/null +++ b/casic-common/src/main/java/com/casic/missiles/es/EsDataFactory.java @@ -0,0 +1,13 @@ +package com.casic.missiles.es; + +public class EsDataFactory { + + private static DataGasEs dataGasEs; + + public static synchronized DataGasEs getDataGasEs(){ + if(dataGasEs==null){ + dataGasEs = new DataGasEs(); + } + return dataGasEs; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java index f4622ff..82ea79a 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/HCNetServiceImpl.java @@ -7,6 +7,7 @@ import com.casic.missiles.enums.BusinessExceptionEnum; import com.casic.missiles.enums.StatusEnum; import com.casic.missiles.es.DataGasEs; +import com.casic.missiles.es.EsDataFactory; import com.casic.missiles.exception.BusinessException; import com.casic.missiles.modular.system.dto.*; import com.casic.missiles.modular.system.dto.monitor.MonitorBaseInfo; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -324,7 +326,6 @@ } - //设置预置位 private boolean setPreset(Long serialHandle, int index) { int sum = (0x01 + 0x00 + 0x03 + index / 256 + index % 256) % 0x100; @@ -597,8 +598,8 @@ public boolean control(Integer userId, String command, Integer isStop) { logger.info("device control: userId = " + userId + " command = " + command + " isStop =" + isStop); Long serialHandle = serialHandle(Long.valueOf(userId)); - if(isStop==1){ - if("clean".equals(command)){ + if (isStop == 1) { + if ("clean".equals(command)) { return sendStopCleanCommand(serialHandle); } return sendStopCommand(serialHandle); @@ -1256,13 +1257,12 @@ if (dwBufSize > 10) { // 甲烷数据 handleGasData(deviceInfo, String.valueOf(chars)); - } else { - int flag = ires[3]; } } } - private void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { + @Async + public void handlegasDataFromGasData(DeviceInfo deviceInfo, Double gas, Double direction, Double pitch) { System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat(" get vertical angle-->").concat(String.valueOf(pitch)).concat("+ get horizontal angle-->").concat(String.valueOf(direction)).concat("+ get gas data-->").concat(String.valueOf(gas))); JSONObject msg = new JSONObject(); msg.put("deviceIp", deviceInfo.getDeviceIp()); @@ -1294,7 +1294,6 @@ if (!"005".equals(data[1])) { return; } - System.out.println(new Date() + ":" + deviceInfo.getDeviceIp().concat("获得甲烷数据:").concat(gasData)); if (data.length < 8) { return; } @@ -1306,31 +1305,23 @@ pitch = pitch > 180 ? (360 - pitch) : (0 - pitch); direction = new BigDecimal(direction).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); pitch = new BigDecimal(pitch).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); - try { - handlegasDataFromGasData(deviceInfo, concentration, direction, pitch); - } catch (Exception e) { - e.printStackTrace(); - } final double direction1 = direction; final double pitch1 = pitch; - threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { - @Override - public void run() { - MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); - if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { - try { - //自带巡检设备 - if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); - insertGasAlarmProduct(monitorBaseInfo, dataGasEs, alarmThreshold, 1 == alarmFlag ? true : false); - } else { - AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); - DataGasEs dataGasEs = insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - insertGasAlarm(monitorBaseInfo, dataGasEs, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); - } - } catch (Exception e) { - logger.error(e.getMessage()); - } + threadPoolTaskExecutor.taskExecutor().execute(() -> { + MonitorBaseInfo monitorBaseInfo = monipoiInfoService.selectInfoByDeviceIp(deviceInfo.getDeviceIp()); + try { + handlegasDataFromGasData(deviceInfo, concentration, direction1, pitch1); + } catch (Exception e) { + e.printStackTrace(); + } + if (ObjectUtil.isNotEmpty(monitorBaseInfo)) { + if (DeviceTypeEnum.XST_TYPE.getCode().equals(monitorBaseInfo.getType())) { + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold); + insertGasAlarmProduct(monitorBaseInfo, concentration, direction1, pitch1, alarmThreshold, 1 == alarmFlag ? true : false); + } else { + AlarmRule alarmRule = alarmRuleService.selectByMonitor(monitorBaseInfo.getMonitorId()); + insertDataGasEs(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); + insertGasAlarm(monitorBaseInfo, concentration, direction1, pitch1, ObjectUtil.isNotEmpty(alarmRule) ? alarmRule.getHigh() : null); } } }); @@ -1351,38 +1342,40 @@ /** * 甲烷数据保存es */ - private DataGasEs insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, - double pitch, Double threshold) throws IOException { - DataGasEs dataGasEs = new DataGasEs(); - dataGasEs.setStationId(monitorBaseInfo.getStationId()); - dataGasEs.setStationName(monitorBaseInfo.getStationName()); - dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); - dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); - dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); - dataGasEs.setDirection(direction); - dataGasEs.setPitch(pitch); - dataGasEs.setConcentration(concentration); - dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); - dataGasEs.setThreshold(threshold); - dataGasService.insertDataGasEs(dataGasEs); - return dataGasEs; + public void insertDataGasEs(MonitorBaseInfo monitorBaseInfo, Double concentration, double direction, + double pitch, Double threshold) { + + try { + DataGasEs dataGasEs = EsDataFactory.getDataGasEs(); + dataGasEs.setStationId(monitorBaseInfo.getStationId()); + dataGasEs.setStationName(monitorBaseInfo.getStationName()); + dataGasEs.setMonitorId(monitorBaseInfo.getMonitorId()); + dataGasEs.setMonitorName(monitorBaseInfo.getMonitorName()); + dataGasEs.setDevcode(monitorBaseInfo.getDevcode()); + dataGasEs.setDirection(direction); + dataGasEs.setPitch(pitch); + dataGasEs.setConcentration(concentration); + dataGasEs.setLogTime(DateUtil.formatDateTime(new Date())); + dataGasEs.setThreshold(threshold); + dataGasService.insertDataGasEs(dataGasEs); + } catch (Exception e) { + e.printStackTrace(); + } } /** * 生成报警(自动消警) */ - private void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold, - boolean alarmFlag) { + public void insertGasAlarmProduct(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold, + boolean alarmFlag) { // 判断报警、消警 if (alarmFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), - Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); @@ -1391,7 +1384,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1399,7 +1392,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } else { //更新和推送设备在线 @@ -1407,13 +1400,13 @@ this.sendDeviceStatusData(monitorBaseInfo.getDeviceIp(), "onLine"); } // 自动消警(同一角度) - if (alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())) > 0) { + if (alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch) > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } } } @@ -1421,16 +1414,16 @@ /** * 生成报警(自动消警) */ - private void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, DataGasEs dataGas, Double threshold) { + public void insertGasAlarm(MonitorBaseInfo monitorBaseInfo, Double concentration, Double direction, Double pitch, Double threshold) { // 判断报警、消警 - if (threshold != null && dataGas.getConcentration() > threshold) { + if (threshold != null && concentration > threshold) { boolean resumeFlag = false; // 报警,注意消警时限 QueryWrapper query = new QueryWrapper<>(); - query.eq("MONITOR_ID", dataGas.getMonitorId()); + query.eq("MONITOR_ID", monitorBaseInfo.getMonitorId()); query.eq("ALARM_STATUS", "1"); - query.eq("ALARM_DIRECTION", dataGas.getDirection()); - query.eq("ALARM_PITCH", dataGas.getPitch()); + query.eq("ALARM_DIRECTION", direction); + query.eq("ALARM_PITCH", pitch); query.isNotNull("RESUME_TIME"); List resumeList = alarmRecordService.list(query); for (AlarmRecord alarmRecord : resumeList) { @@ -1444,12 +1437,12 @@ if (!resumeFlag) { // 消除同一角度已有报警 - alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), - Double.valueOf(dataGas.getPitch())); + alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, + pitch); // 插入新报警 - AlarmRecord alarmRecord = new AlarmRecord(dataGas.getMonitorId(), - StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, dataGas.getConcentration(), - threshold, Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch()), + AlarmRecord alarmRecord = new AlarmRecord(monitorBaseInfo.getMonitorId(), + StatusEnum.GAS_ALARM, StatusEnum.GAS_ALARM_CONTENT, concentration, + threshold, direction, pitch, StatusEnum.ALARM_ON, monitorBaseInfo.getLineNum()); alarmRecordService.save(alarmRecord); //更新设备状态为报警 @@ -1457,7 +1450,7 @@ //推送前端 JSONObject msg = new JSONObject(); msg.put("type", "gasAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); msg.put("concentration", alarmRecord.getAlarmValue()); msg.put("alarmDirection", alarmRecord.getAlarmDirection()); @@ -1465,7 +1458,7 @@ msg.put("alarmId", alarmRecord.getId()); msg.put("alarmTime", alarmRecord.getAlarmTime()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******sendwebSocketAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******sendwebSocketAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); //增加门禁报警控制,只针对天讯通设备 if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { @@ -1478,14 +1471,14 @@ } } else { // 自动消警(同一角度) - int count = alarmRecordService.clearByMonitor(dataGas.getMonitorId(), Double.valueOf(dataGas.getDirection()), Double.valueOf(dataGas.getPitch())); + int count = alarmRecordService.clearByMonitor(monitorBaseInfo.getMonitorId(), direction, pitch); if (count > 0) { JSONObject msg = new JSONObject(); msg.put("type", "cancelAlarm"); - msg.put("monitorId", dataGas.getMonitorId()); + msg.put("monitorId", monitorBaseInfo.getMonitorId()); msg.put("monitorName", monitorBaseInfo.getMonitorName()); webSocket.sendAllMessage(msg.toJSONString()); - logger.info("*******webSocketcancelAlarmRecord,monitorid=" + dataGas.getMonitorId() + ",time:" + new Date()); + logger.info("*******webSocketcancelAlarmRecord,monitorid=" + monitorBaseInfo.getMonitorId() + ",time:" + new Date()); } if (DeviceTypeEnum.SELF_TYPE.getCode().equals(monitorBaseInfo.getType())) { threadPoolTaskExecutor.taskExecutor().execute(new Runnable() { diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java index 4037b1b..f201d01 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/task/CollectGasDataTask.java @@ -9,6 +9,7 @@ import com.casic.missiles.modular.system.service.IBusMonipoiInfoService; import com.casic.missiles.modular.system.service.IHCNetService; import com.casic.missiles.modular.system.service.impl.HCNetServiceImpl; +import com.casic.missiles.modular.system.util.PoolConfig; import com.sun.jna.Memory; import com.sun.jna.Pointer; import org.slf4j.Logger; @@ -48,6 +49,9 @@ @Value("${casic.isOpen}") private int isOpen; + @Autowired + private PoolConfig threadPoolTaskExecutor; + @Override public void run(ApplicationArguments args) throws Exception { // 查询全部设备 @@ -66,6 +70,7 @@ if (ObjectUtil.isNotEmpty(monipoiInfo.getDeviceIp())) { Runnable worker = this.new GasDataRunnable(monipoiInfo.getDeviceIp(), monipoiInfo.getDeviceUser(), monipoiInfo.getDevicePassword()); executor.execute(worker); +// threadPoolTaskExecutor.taskExecutor().execute(worker); } } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java index 638095e..8f195bd 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/util/PoolConfig.java @@ -49,15 +49,16 @@ taskExecutor.setMaxPoolSize(32); // 队列程度 taskExecutor.setQueueCapacity(800); +// taskExecutor.setAllowCoreThreadTimeOut(true); // 线程空闲时间 - taskExecutor.setKeepAliveSeconds(60); + taskExecutor.setKeepAliveSeconds(30); // 线程前缀名称 taskExecutor.setThreadNamePrefix("syncExecutor--"); // 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean, // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 taskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 任务的等待时间 如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。 - taskExecutor.setAwaitTerminationSeconds(60); + taskExecutor.setAwaitTerminationSeconds(30); // 线程不够用时由调用的线程处理该任务 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return taskExecutor; diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index 1805400..959f5b6 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -2,6 +2,9 @@ port: 11306 servlet: context-path: /pan-tilt + tomcat: + threads: + max: 600 ################### spring配置 ################### spring: datasource: