Newer
Older
sink / src / main / java / org / flume / alarm / resp / MultiLeakResponse.java
zhout on 2 Mar 2022 12 KB first commit
package org.flume.alarm.resp;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.flume.alarm.AlarmEnumDTO.MultiLeakAlarmEnum;
import org.flume.alarm.base.AbstractResponse;
import org.flume.alarm.base.DeviceTypeEnum;
import org.flume.alarm.core.util.DateUtils;
import org.flume.alarm.core.util.StringUtils;
import org.flume.alarm.manager.*;
import org.flume.alarm.restful.DeviceUtilDTO;
import org.flume.alarm.util.Configure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by lenovo on 2018/1/9.
 */
public class MultiLeakResponse extends AbstractResponse {

    private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    private static final String[] mType = {"Data", "Event", "StartupRequest", "SetResponse"};
    //    private static final String[] sensorType = {"multiPressure," + MultiLeakAlarmEnum.PRESS_OVER_THRESH.getIndex(),
//            "multiNoise," + MultiLeakAlarmEnum.NOISE_OVER_THRESH.getIndex()};
    private static final String[] bType = {"MultiLeakFlowData", "MultiLeakPressureData", "MultiLeakNoiseData", "MultiLeakCellData", "MultiLeakConfigSuccess"};

    private static final String[] sensorCodeType = {"000031", "000033", "000032"};//流量,压力,噪声

    @Override
    public void process(String content) {
        ClassPathXmlApplicationContext ac = this.getAc();
        SensorFlowManager sensorFlowManager = ac.getBean(SensorFlowManager.class);
        SensorPressManager sensorPressManager = ac.getBean(SensorPressManager.class);
        NoiseManager noiseManager = ac.getBean(NoiseManager.class);
        AlarmRuleManager alarmRuleManager = ac.getBean(AlarmRuleManager.class);
        AlarmRecordManager alarmRecordManager = ac.getBean(AlarmRecordManager.class);
        ImeiManager imeiManager = ac.getBean(ImeiManager.class);
        SensorCellManager sensorCellManager = ac.getBean(SensorCellManager.class);
        DeviceConfigManager deviceConfigManager = ac.getBean(DeviceConfigManager.class);
        JSONObject json = JSONObject.fromObject(content);
        String devCode = json.get("devCode").toString();
        logger.info("--------RECEIVE:设备编号:" + devCode + ",上传数据:" + json.toString() + "-------");
        JSONObject jsonObject = (JSONObject) json.get("mBody");
        if (mType[0].equals(json.get("mType"))) {//存储上报数据
            String bTypeName = jsonObject.get("bType").toString();
            JSONArray jsonArray = (JSONArray) jsonObject.get("datas");
            if (bType[0].equals(bTypeName)) {//存储流量
                for (int i = 0; i < jsonArray.size(); i++) {
                    try {
                        String multiFlow = ((JSONObject) jsonArray.get(i)).get("multiFlow").toString();
                        String multiForwardFlow = ((JSONObject) jsonArray.get(i)).get("multiForwardFlow").toString();
                        String multiBackwardFlow = ((JSONObject) jsonArray.get(i)).get("multiBackwardFlow").toString();
                        String upTime = ((JSONObject) jsonArray.get(i)).get("uptime").toString();
                        sensorFlowManager.saveData(devCode, multiFlow, multiBackwardFlow, multiForwardFlow, "", upTime);//存采集数据

                        //消除设备异常
                        if (i == jsonArray.size() - 1) {
                            alarmRecordManager.cancelDeviceAlarm(devCode);
                            alarmRecordManager.deleteByDevcode(devCode);
                        }

                        /***
                         * 下面推送采集数据
                         */
                        Map<String, Object> dataMap = new HashMap<String, Object>();
                        Map<String, Object> paramMap = new HashMap<String, Object>();
                        dataMap.put("serviceId", DeviceUtilDTO.devTypeCodeMap.
                                get(DeviceTypeEnum.MultiLeak.toString()));
                        paramMap.put("multiFlow", multiFlow);
                        paramMap.put("multiForwardFlow", multiForwardFlow);
                        paramMap.put("multiBackwardFlow", multiBackwardFlow);
                        dataMap.put("paras", paramMap);
                        DeviceUtilDTO.sendDataMsg(Configure.getProperty("reportInfoURL"),
                                new DeviceUtilDTO("", "false", "updata", "alarm","", "",
                                        DateUtils.sdf4.format(new Date()), dataMap,devCode,
                                        DeviceTypeEnum.MultiLeak.toString()));
                    } catch (IOException e) {
                        e.printStackTrace();
                        logger.error("--------ERROR:设备编号:" + devCode + ",存储数据:" + jsonArray.get(i).toString() + "失败-------");
                    }
                }
            }
            if (bType[1].equals(bTypeName)) {//存储压力
                for (int i = 0; i < jsonArray.size(); i++) {
                    try {

                        String multiPressure = ((JSONObject) jsonArray.get(i)).get("multiPressure").toString();
                        String upTime = ((JSONObject) jsonArray.get(i)).get("uptime").toString();
                        sensorPressManager.saveData(devCode, multiPressure, "", upTime);//存采集数据
                        String thresh = alarmRuleManager.getRuleStr(devCode, DeviceTypeEnum.MultiLeak.toString());//获取报警阈值
                        String threshPress = thresh.split(",", 3)[2];
                        if (StringUtils.isNotBlank(threshPress)) {
                            String threshPressHigh = threshPress.split("\\^")[0];
                            String threshPressLow = threshPress.split("\\^")[1];
                            if (StringUtils.isNotBlank(multiPressure) && StringUtils.isNotBlank(threshPressHigh)
                                    && Float.valueOf(multiPressure) > Float.valueOf(threshPressHigh))
                                alarmRecordManager.saveData(devCode, DeviceTypeEnum.MultiLeak.toString(), multiPressure,
                                        String.valueOf(MultiLeakAlarmEnum.PRESS_OVER_THRESH.getIndex()), "0");
                            if (StringUtils.isNotBlank(multiPressure) && StringUtils.isNotBlank(threshPressLow)
                                    && Float.valueOf(multiPressure) < Float.valueOf(threshPressLow))
                                alarmRecordManager.saveData(devCode, DeviceTypeEnum.MultiLeak.toString(), multiPressure,
                                        String.valueOf(MultiLeakAlarmEnum.PRESS_LOW_THRESH.getIndex()), "0");
                        }

                        /***
                         * 下面推送采集数据
                         */
                        Map<String, Object> dataMap = new HashMap<String, Object>();
                        Map<String, Object> paramMap = new HashMap<String, Object>();
                        dataMap.put("serviceId", DeviceUtilDTO.devTypeCodeMap.
                                get(DeviceTypeEnum.MultiLeak.toString()));
                        paramMap.put("multiPressure", multiPressure);
                        dataMap.put("paras", paramMap);
                        DeviceUtilDTO.sendDataMsg(Configure.getProperty("reportInfoURL"),
                                new DeviceUtilDTO("", "false", "updata", "alarm","", "",
                                        DateUtils.sdf4.format(new Date()), dataMap,devCode,
                                        DeviceTypeEnum.MultiLeak.toString()));

                    } catch (IOException e) {
                        e.printStackTrace();
                        logger.error("--------ERROR:设备编号:" + devCode + ",存储数据:" + jsonArray.get(i).toString() + "失败-------");
                    }
                }
            }
            if (bType[2].equals(bTypeName)) {//存储噪声
                for (int i = 0; i < jsonArray.size(); i++) {
                    try {
                        String multiNoise = ((JSONObject) jsonArray.get(i)).get("multiNoise").toString();
                        String upTime = ((JSONObject) jsonArray.get(i)).get("uptime").toString();
                        noiseManager.saveData(devCode, "", upTime, "", multiNoise,"","","");
                        String thresh = alarmRuleManager.getRuleStr(devCode, DeviceTypeEnum.MultiLeak.toString());//获取报警阈值
                        String threshNoise = thresh.split(",", 3)[1];
                        if (StringUtils.isNotBlank(threshNoise) && Float.valueOf(multiNoise) > Float.valueOf(threshNoise)) {
                            alarmRecordManager.saveData(devCode, DeviceTypeEnum.MultiLeak.toString(), multiNoise,
                                    String.valueOf(MultiLeakAlarmEnum.NOISE_OVER_THRESH.getIndex()), "0");
                        }
                        /***
                         * 下面推送采集数据
                         */
                        Map<String, Object> dataMap = new HashMap<String, Object>();
                        Map<String, Object> paramMap = new HashMap<String, Object>();
                        dataMap.put("serviceId", DeviceUtilDTO.devTypeCodeMap.
                                get(DeviceTypeEnum.MultiLeak.toString()));
                        paramMap.put("multiNoise", multiNoise);
                        dataMap.put("paras", paramMap);
                        DeviceUtilDTO.sendDataMsg(Configure.getProperty("reportInfoURL"),
                                new DeviceUtilDTO("", "false", "updata", "alarm","", "",
                                        DateUtils.sdf4.format(new Date()), dataMap,devCode,
                                        DeviceTypeEnum.MultiLeak.toString()));
                    } catch (IOException e) {
                        e.printStackTrace();
                        logger.error("--------ERROR:设备编号:" + devCode + ",存储数据:" + jsonArray.get(i).toString() + "失败-------");
                    }
                }
            }
            if (bType[3].equals(bTypeName)) {//存储电量
                String cell = jsonObject.get("cell").toString();
                try {
                    sensorCellManager.saveData(devCode, cell);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        } else if (mType[1].equals(json.get("mType"))) {//存储报警事件
            JSONArray jsonArray = (JSONArray) jsonObject.get("eventType");
            for (int i = 0; i < jsonArray.size(); i++) {
                try {
                    alarmRecordManager.saveData(devCode, DeviceTypeEnum.MultiLeak.toString(), "",
                            String.valueOf(MultiLeakAlarmEnum.valueOf(jsonArray.get(i).toString()).getIndex()), "1");
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("--------ERROR:设备编号:" + devCode + ",存储报警数据:" + jsonArray.get(i).toString() + "失败-------");
                }
            }
        } else if (mType[2].equals(json.get("mType"))) {//三码存储
            try {
                String imei = jsonObject.get("imei").toString();
                String iccid = jsonObject.get("iccid").toString();
                imeiManager.saveData(devCode, imei, iccid);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("--------ERROR:设备编号:" + devCode + ",更新参数下发状态失败-------");
            }
        } else if (mType[3].equals(json.get("mType"))) {//更新下发参数状态
            try {
                if (DeviceTypeEnum.MultiLeak.name().equals(json.get("devType"))) {
                    if (bType[4].equals(jsonObject.get("bType"))) {
                        String[] devCodes=devCode.split("-");
                        deviceConfigManager.updateMultiSensorStatus(devCodes[0],sensorCodeType[Integer.valueOf(devCodes[1])-1]);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("--------ERROR:设备编号:" + devCode + ",更新参数下发状态失败-------");
            }
        }
    }
}