/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.well.mysql.sink;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.well.well.base.AbstractResponse;
import org.well.well.manager.DeviceManager;
import org.well.well.util.ResponseResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.ArrayList;
import java.util.List;

public class WellSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(WellSink.class);
    private int batchSize;
    private ClassPathXmlApplicationContext ac = null;
    public WellSink() {
        LOG.info("wellMysqlSink start...");
    }
    public void configure(Context context) {
        String s[] = System.getProperty("java.class.path").split(";");
        for (String string : s) {
            System.out.println("**********************"+string+"************************");
        }
        ac = new ClassPathXmlApplicationContext(
                new String[]{"classpath:wellSensor/*.xml"});
        batchSize = context.getInteger("batchSize", 100);
    }

    @Override
    public void start() {
        super.start();
        LOG.info("--------wellMysqlSink start-------");
        System.out.println("--------wellMysqlSink start-------");


    }

    @Override
    public void stop() {
        super.stop();
        LOG.info("--------wellMysqlSink stop-------");

    }

    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;
        List<String> actions = Lists.newArrayList();
        transaction.begin();
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {
                    content = new String(event.getBody());
                    actions.add(content);
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }
            if (actions.size() > 0) {
                for (String temp : actions) {
                    LOG.info("--------wellMysqlSink接收数据：" + temp + "-------");
                    AbstractResponse resp = ResponseResolver.makeResponse(temp);
                    if (resp == null) continue;
                    resp.setAc(ac);
                    resp.process(temp);//对应的消息处理
                    LOG.info("--------wellMysqlSink存库处理完！------");
                }

            }
            transaction.commit();
        } catch (Throwable e) {
            try {
//                transaction.rollback();
                transaction.commit();
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            LOG.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }


    public static void main(String[] args) {

        ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(
                new String[]{"classpath:wellSensor/*.xml"});
        ac.start();
        String temp =
                "{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"112018030001\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\": 90,\"datas\": [{\"uptime\":\"20180911222000\",\"level\":0.55}, {\"uptime\":\"20181111212001\",\"level\": 0.46}],\"logTime\":\"20190315010000\"},\"ts\":\"12345678901\"}";

//        temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}";

        temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412020110001\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20210624000000\"}],\"logTime\":\"20210624000000\",\"bType\":\"WellData\"},\"ts\":1559665802828}";
//       temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}";
//      temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}";
        temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412020110001\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}";
//                temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"71201900001\",\"mBody\":{\"datas\":[{\"longitude\":126.243324343,\"latitude\":39.2546546546,\"uptime\":\"20190809140900\"}],\"logTime\":\"20190809141012\",\"bType\":\"LocatorData\"},\"ts\":1565331012453}";
//        temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"cell\":99,\"datas\":[{\"noiseVal\":84.7926,\"noiseFreq\":1421,\"uptime\":\"20191017141000\"},{\"noiseVal\":48.1797,\"noiseFreq\":212,\"uptime\":\"20190822141300\"},{\"noiseVal\":33.1398,\"noiseFreq\":553,\"uptime\":\"20190822141600\"},{\"noiseVal\":0.85,\"noiseFreq\":1049,\"uptime\":\"20190822141900\"},{\"noiseVal\":0.86,\"noiseFreq\":1545,\"uptime\":\"20191017142200\"}],\"logTime\":\"20190822141337\",\"bType\":\"NoiseDigData\"},\"ts\":1566454417130}";
//        temp="{\"mType\":\"Data\",\"devType\":\"WasteGas\",\"devCode\":\"21201900001\",\"mBody\":{\"datas\":[{\"CO\":60,\"O2\":25,\"H2S\":15,\"CH4\":7,\"liquidSwitch\":true,\"uptime\":\"20191018163319\"}],\"logTime\":\"20191018151844\",\"bType\":\"WasteGasData\"},\"ts\":1571296724288}";
//        temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"51201900001\",\"mBody\":{\"cell\":99,\"datas\":[{\"temperature\":.55,\"humidity\":0.2,\"uptime\":\"20191017150000\"}],\"logTime\":\"20191017150225\",\"bType\":\"TempHumiData\"},\"ts\":1571295745881}";
//              temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidUltrasonicError\"],\"logTime\":\"20190710134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}";

//              temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"31201900001\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":10.8701172,\"uptime\":\"20191017135400\"}],\"logTime\":\"20191017140124\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}";

//        temp="{\"mType\":\"Event\",\"devType\":\"TempHumi\",\"devCode\":\"51201900001\",\"mBody\":{\"eventType\":[\"TemperatureFail\",\"TemperatureError\",\"HumidityFail\",\"HumidityError\"],\"logTime\":\"20191021140118\",\"bType\":\"TempHumiEvent\"},\"ts\":1571292078959}";
//        temp="{\"mType\":\"SetResponse\",\"devType\":\"NoiseDig\",\"devCode\":\"88888888881\",\"mBody\":{\"bType\":\"NoiseDigConfigSuccess\"},\"ts\":1556182310514}";
//        temp="{\"mType\":\"Data\",\"devType\":\"WasteGas\",\"devCode\":\"08888888885\",\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CO\":0,\"O2\":1.1,\"H2S\":10000,\"CH4\":10000,\"liquidSwitch\":true,\"uptime\":\"20191217165124\"}],\"logTime\":\"20191217165124\"},\"ts\":1556182310514}";
//        temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"1122222222212\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":98,\"datas\":[{\"level\":6.73,\"uptime\":\"20191217161000\"},{\"level\":5.72,\"uptime\":\"20191217162000\"},{\"level\":7.71,\"uptime\":\"20210406113000\"}],\"logTime\":\"20210406113000\"},\"ts\":1556186030842}";
//       temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}";
//        temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}";
//        temp="{\"mType\":\"Data\",\"devType\":\"Noise\",\"devCode\":\"3120312313213\",\"mBody\":{\"bType\":\"NoiseData\",\"cell\":88,\"datas\":[{\"noiseVal\":1.83666,\"noiseFreq\":2040,\"uptime\":\"20200820125700\"}],\"logTime\":\"20200820123131\"},\"ts\":1556184691451}";


//        temp="{\"mType\":\"Data\",\"devType\":\"Noise\",\"devCode\":\"212020000a\",\"mBody\":{\"cell\":99,\"datas\":[{\"noiseVal\":84.7926,\"noiseFreq\":1421,\"uptime\":\"20191017141000\"},{\"noiseVal\":48.1797,\"noiseFreq\":212,\"uptime\":\"20190822141300\"},{\"noiseVal\":33.1398,\"noiseFreq\":553,\"uptime\":\"20190822141600\"},{\"noiseVal\":0.85,\"noiseFreq\":1049,\"uptime\":\"20190822141900\"},{\"noiseVal\":4000.86,\"noiseFreq\":1545,\"uptime\":\"20191017142200\"}],\"logTime\":\"20190822141337\",\"bType\":\"NoiseData\"},\"ts\":1566454417130}";


//        temp="{\"mType\":\"Data\",\"devType\":\"TempPressure\",\"devCode\":\"812019010001\",\"mBody\":{\"cell\":82,\"datas\":[{\"temperature\":12.7999992,\"pressure\":104.66,\"uptime\":\"20210401155800\"}],\"logTime\":\"20210401000148\",\"bType\":\"TempPressureData\"},\"ts\":1617206508232}";



//        List<String> arr= new ArrayList<String>();
//
        temp="{\"mType\":\"Data\",\"devType\":\"Tube\",\"devCode\":\"1111111111\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":26.8701172,\"uptime\":\"20210813115400\"}],\"logTime\":\"20210813105400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}";
//        arr.add(temp);
//        temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"3123421342314\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":50.8701172,\"uptime\":\"20210813105700\"}],\"logTime\":\"20210813105400\",\"bType\":\"LGData\"},\"ts\":1571292084960}";
//        arr.add(temp);
//        temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"32201900001\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"liquid\":10,\"uptime\":\"20210813105800\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":10.8701172,\"uptime\":\"20210813105700\"}],\"logTime\":\"20210813105400\",\"bType\":\"LGData\"},\"ts\":1571292084960}";
//        arr.add(temp);

//        temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322019010222\",\"mBody\":{\"cell\":92,\"pci\":60,\"rsrp\":-87,\"snr\":14,\"datas\":[{\"gas\":0.0,\"uptime\":\"20210819000900\"}],\"logTime\":\"20210819001029\",\"bType\":\"LGData\"},\"ts\":1629303029014}";
//        arr.add(temp);

        temp="{\"mType\":\"Data\",\"devType\":\"Tube\",\"devCode\":\"342020030014\",\"mBody\":{\"cell\":15,\"pci\":94,\"rsrp\":-103,\"snr\":7,\"datas\":[{\"gas\":0.0,\"uptime\":\"20211128000000\"}],\"logTime\":\"20211210174648\",\"bType\":\"TubeData\"},\"ts\":1639129608475}";

        temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322021050010\",\"mBody\":{\"cell\":91,\"pci\":279,\"rsrp\":-105,\"snr\":10,\"datas\":[{\"gas\":40.0,\"uptime\":\"20220113175000\"},{\"liquid\":1040.07336,\"uptime\":\"20220113175000\"}],\"logTime\":\"20220113175144\",\"bType\":\"LGData\"},\"ts\":1642067504321}";
        temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322021050019\",\"mBody\":{\"cell\":83,\"pci\":100,\"rsrp\":-85,\"snr\":24,\"datas\":[{\"gas\":0.599999964,\"uptime\":\"20220513020200\"},{\"gas\":0.599999964,\"uptime\":\"20220513030200\"},{\"liquid\":1108.392,\"uptime\":\"20220513020200\"},{\"liquid\":1112.47058,\"uptime\":\"20220513030200\"}],\"logTime\":\"20220513030400\",\"bType\":\"LGData\"},\"ts\":1652382240656}";
        AbstractResponse resp = ResponseResolver.makeResponse(temp);
        resp.setAc(ac);
//        for(String dd:arr){
//            resp.process(dd);
//        }
        resp.process(temp);
//        DeviceManager deviceManager = ac.getBean(DeviceManager.class);
//        deviceManager.updateDeviceStatus();

    }
}  
  
