Newer
Older
sink / src / main / java / org / flume / alarm / resp / WasteGasResponse.java
zhout on 2 Mar 2022 11 KB first commit
package org.flume.alarm.resp;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.flume.alarm.AlarmEnumDTO.LiquidAlarmEnum;
import org.flume.alarm.AlarmEnumDTO.WasteGasAlarmEnum;
import org.flume.alarm.base.AbstractResponse;
import org.flume.alarm.base.DeviceTypeEnum;
import org.flume.alarm.core.util.DateUtils;
import org.flume.alarm.core.util.StringUtils;
import org.flume.alarm.domain.Device;
import org.flume.alarm.dto.LiquidDTO;
import org.flume.alarm.manager.*;
import org.flume.alarm.restful.AlarmDataCenterUtilDTO;
import org.flume.alarm.restful.AlarmUtilDTO;
import org.flume.alarm.restful.DeviceUtilDTO;
import org.flume.alarm.util.Configure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by lenovo on 2018/1/9.
 */
public class WasteGasResponse extends AbstractResponse {

    private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    private static final String[] mType = {"Data", "Event", "SetResponse", "StartupRequest"};

    @Override
    public void process(String content) {
        ClassPathXmlApplicationContext ac = this.getAc();
        WasteGasManager wasteGasManager = ac.getBean(WasteGasManager.class);
        DeviceConfigManager deviceConfigManager = ac.getBean(DeviceConfigManager.class);
        DeviceManager deviceManager = ac.getBean(DeviceManager.class);
        final AlarmRecordManager alarmRecordManager = ac.getBean(AlarmRecordManager.class);
        ImeiManager imeiManager = ac.getBean(ImeiManager.class);
        JSONObject json = JSONObject.fromObject(content);
        final String devCode = json.get("devCode").toString();
        logger.info("--------RECEIVE:设备编号:" + devCode + ",上传数据:" + json.toString() + "-------");
        JSONObject jsonObject = (JSONObject) json.get("mBody");
        if (mType[0].equals(json.get("mType"))) {//存储上报数据
            JSONArray jsonArray = (JSONArray) jsonObject.get("datas");
            for (int i = 0; i < jsonArray.size(); i++) {
                try {
                    final String CO = ((JSONObject) jsonArray.get(i)).get("CO").toString();
                    final String O2 = ((JSONObject) jsonArray.get(i)).get("O2").toString();
                    final String H2S = ((JSONObject) jsonArray.get(i)).get("H2S").toString();
                    final String CH4 = ((JSONObject) jsonArray.get(i)).get("CH4").toString();
                    final String upTime = ((JSONObject) jsonArray.get(i)).get("uptime").toString();
                    Boolean isOpen = Boolean.valueOf(((JSONObject) jsonArray.get(i)).get("liquidSwitch").toString());
                    wasteGasManager.saveData(devCode, upTime, CO, O2, H2S, CH4, isOpen);//存采集数据
                    //消除设备异常
                    if (i < jsonArray.size() - 1) continue;
                    alarmRecordManager.cancelDeviceAlarm(devCode);
                    alarmRecordManager.deleteByDevcode(devCode);


                    String alarmCO = Float.parseFloat(CO) > 50 ? CO : "*";
                    String alarmO2 = Float.parseFloat(O2) > 18 && Float.parseFloat(O2) < 23 ? "*" : O2;
                    String alarmH2S = Float.parseFloat(H2S) > 10 ? H2S : "*";
                    String alarmCH4 = Float.parseFloat(CH4) > 2 ? CH4 : "*";
                    String alarmValue = alarmCO + "," + alarmH2S + "," + alarmO2 + "," + alarmCH4;
                    if (!("*,*,*,*".equals(alarmValue))) {
                        int count = alarmRecordManager.isAlarm(devCode);
                        //消除历史告警
                        if (count == 0) {
                            alarmRecordManager.cancelAlarm(devCode);
                        }
                        if (count >= Integer.valueOf(Configure.getProperty("alarmCount", "3")).intValue()) continue;
                        alarmRecordManager.saveData(devCode, DeviceTypeEnum.WasteGas.toString(), alarmValue,
                                String.valueOf(WasteGasAlarmEnum.OVER_THRESH.getIndex()), "0");


                        if (count == Integer.valueOf(Configure.getProperty("alarmCount", "3")).intValue() - 1) {

                            /***
                             * 下面推送报警事件至业务中台
                             */

                            if ("true".equals(Configure.getProperty("synchronizeCenter.enabled", "false"))) {
                                final Device device = deviceManager.getDeviceByDevCode(devCode, DeviceTypeEnum.WasteGas.toString());
                                ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
                                TimerTask task = new TimerTask() {
                                    @Override
                                    public void run() {
                                        String uuid = AlarmDataCenterUtilDTO.getUUID();
                                        String resultMsg = AlarmDataCenterUtilDTO.sendMsg(Configure.getProperty("sendCenterURL"),
                                                new AlarmDataCenterUtilDTO(
                                                        uuid,
                                                        device.getFactory(),
                                                        "RJ0104",
                                                        "RJ010410",
                                                        DateUtils.DateFormat(upTime),
                                                        "A",
                                                        "1",
                                                        "0",
                                                        "1",
                                                        device.getDevCode(),
                                                        device.getDevCode(),
                                                        device.getLongtitude(),
                                                        device.getLatitude(),
                                                        WasteGasAlarmEnum.OVER_THRESH.toString(),
                                                        WasteGasAlarmEnum.OVER_THRESH.toString()));
                                        //执行成功后,更新已发送
                                        if (StringUtils.isNotBlank(resultMsg)) {
                                            JSONObject json = JSONObject.fromObject(resultMsg);
                                            String code = json.get("code").toString();
                                            if ("200".equals(code)) {
                                                JSONObject dataJson = json.getJSONObject("data");
                                                alarmRecordManager.updateSend(device.getDevCode(), dataJson.getString("eventId"));
                                            }
                                        }
                                    }
                                };

                                newScheduledThreadPool.schedule(task, 100, TimeUnit.MILLISECONDS);
                            }
                        }
                    }
                    /***
                     * 下面推送采集数据
                     */


                    if ("true".equals(Configure.getProperty("synchronize.enabled", "false"))) {
                        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
                        TimerTask task = new TimerTask() {
                            @Override
                            public void run() {

                                Map<String, Object> dataMap = new HashMap<String, Object>();
                                Map<String, Object> paramMap = new HashMap<String, Object>();
                                dataMap.put("serviceId", DeviceUtilDTO.devTypeCodeMap.
                                        get(DeviceTypeEnum.WasteGas.toString()));
                                paramMap.put("CO", CO);
                                paramMap.put("O2", O2);
                                paramMap.put("H2S", H2S);
                                paramMap.put("CH4", CH4);
                                dataMap.put("paras", paramMap);
                                DeviceUtilDTO.sendDataMsg(Configure.getProperty("reportInfoURL"),
                                        new DeviceUtilDTO("", "false", "updata", "alarm", "", "",
                                                DateUtils.sdf4.format(new Date()), dataMap, devCode,
                                                DeviceTypeEnum.WasteGas.toString()));

                            }
                        };
                        newScheduledThreadPool.schedule(task, 100, TimeUnit.MILLISECONDS);
                    }


                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("--------ERROR:设备编号:" + devCode + ",存储数据:" + jsonArray.get(i).toString() + "失败-------");
                }
            }
        } else if (mType[1].equals(json.get("mType"))) {//存储报警事件
//            JSONArray jsonArray = (JSONArray) jsonObject.get("eventType");
//            for (int i = 0; i < jsonArray.size(); i++) {
//                try {
//                    alarmRecordManager.saveData(devCode, DeviceTypeEnum.Liquid.toString(), "",
//                            String.valueOf(LiquidAlarmEnum.valueOf(jsonArray.get(i).toString()).getIndex())
//                    );
//                } catch (IOException e) {
//                    e.printStackTrace();
//                    logger.error("--------ERROR:设备编号:" + devCode + ",存储报警数据:" + jsonArray.get(i).toString() + "失败-------");
//                }
//            }
        } else if (mType[2].equals(json.get("mType"))) {//更新下发参数状态
            try {
                if (DeviceTypeEnum.WasteGas.name().equals(json.get("devType"))) {
                    if ("WasteGasConfigSuccess".equals(jsonObject.get("bType"))) {
                        deviceConfigManager.updateStatus(devCode);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("--------ERROR:设备编号:" + devCode + ",更新参数下发状态失败-------");
            }
        } else if (mType[3].equals(json.get("mType"))) {//三码存储
            try {
                String imei = jsonObject.get("imei").toString();
                String iccid = jsonObject.get("iccid").toString();
                imeiManager.saveData(devCode, imei, iccid);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("--------ERROR:设备编号:" + devCode + ",更新参数下发状态失败-------");
            }
        }
    }
}