/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.well.mysql.sink; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.well.well.base.AbstractResponse; import org.well.well.manager.DeviceManager; import org.well.well.util.ResponseResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.ArrayList; import java.util.List; public class WellSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; public WellSink() { LOG.info("wellMysqlSink start..."); } public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { System.out.println("**********************"+string+"************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); batchSize = context.getInteger("batchSize", 100); } @Override public void start() { super.start(); LOG.info("--------wellMysqlSink start-------"); System.out.println("--------wellMysqlSink start-------"); } @Override public void stop() { super.stop(); LOG.info("--------wellMysqlSink stop-------"); } public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { content = new String(event.getBody()); actions.add(content); } else { result = Status.BACKOFF; break; } } if (actions.size() > 0) { for (String temp : actions) { LOG.info("--------wellMysqlSink接收数据:" + temp + "-------"); AbstractResponse resp = ResponseResolver.makeResponse(temp); if (resp == null) continue; resp.setAc(ac); resp.process(temp);//对应的消息处理 LOG.info("--------wellMysqlSink存库处理完!------"); } } transaction.commit(); } catch (Throwable e) { try { // transaction.rollback(); transaction.commit(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } public static void main(String[] args) { ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); ac.start(); String temp = "{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"112018030001\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\": 90,\"datas\": [{\"uptime\":\"20180911222000\",\"level\":0.55}, {\"uptime\":\"20181111212001\",\"level\": 0.46}],\"logTime\":\"20190315010000\"},\"ts\":\"12345678901\"}"; // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412020110001\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20210624000000\"}],\"logTime\":\"20210624000000\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412020110001\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; // temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"71201900001\",\"mBody\":{\"datas\":[{\"longitude\":126.243324343,\"latitude\":39.2546546546,\"uptime\":\"20190809140900\"}],\"logTime\":\"20190809141012\",\"bType\":\"LocatorData\"},\"ts\":1565331012453}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"cell\":99,\"datas\":[{\"noiseVal\":84.7926,\"noiseFreq\":1421,\"uptime\":\"20191017141000\"},{\"noiseVal\":48.1797,\"noiseFreq\":212,\"uptime\":\"20190822141300\"},{\"noiseVal\":33.1398,\"noiseFreq\":553,\"uptime\":\"20190822141600\"},{\"noiseVal\":0.85,\"noiseFreq\":1049,\"uptime\":\"20190822141900\"},{\"noiseVal\":0.86,\"noiseFreq\":1545,\"uptime\":\"20191017142200\"}],\"logTime\":\"20190822141337\",\"bType\":\"NoiseDigData\"},\"ts\":1566454417130}"; // temp="{\"mType\":\"Data\",\"devType\":\"WasteGas\",\"devCode\":\"21201900001\",\"mBody\":{\"datas\":[{\"CO\":60,\"O2\":25,\"H2S\":15,\"CH4\":7,\"liquidSwitch\":true,\"uptime\":\"20191018163319\"}],\"logTime\":\"20191018151844\",\"bType\":\"WasteGasData\"},\"ts\":1571296724288}"; // temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"51201900001\",\"mBody\":{\"cell\":99,\"datas\":[{\"temperature\":.55,\"humidity\":0.2,\"uptime\":\"20191017150000\"}],\"logTime\":\"20191017150225\",\"bType\":\"TempHumiData\"},\"ts\":1571295745881}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidUltrasonicError\"],\"logTime\":\"20190710134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"31201900001\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":10.8701172,\"uptime\":\"20191017135400\"}],\"logTime\":\"20191017140124\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; // temp="{\"mType\":\"Event\",\"devType\":\"TempHumi\",\"devCode\":\"51201900001\",\"mBody\":{\"eventType\":[\"TemperatureFail\",\"TemperatureError\",\"HumidityFail\",\"HumidityError\"],\"logTime\":\"20191021140118\",\"bType\":\"TempHumiEvent\"},\"ts\":1571292078959}"; // temp="{\"mType\":\"SetResponse\",\"devType\":\"NoiseDig\",\"devCode\":\"88888888881\",\"mBody\":{\"bType\":\"NoiseDigConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"WasteGas\",\"devCode\":\"08888888885\",\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CO\":0,\"O2\":1.1,\"H2S\":10000,\"CH4\":10000,\"liquidSwitch\":true,\"uptime\":\"20191217165124\"}],\"logTime\":\"20191217165124\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"1122222222212\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":98,\"datas\":[{\"level\":6.73,\"uptime\":\"20191217161000\"},{\"level\":5.72,\"uptime\":\"20191217162000\"},{\"level\":7.71,\"uptime\":\"20210406113000\"}],\"logTime\":\"20210406113000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"Noise\",\"devCode\":\"3120312313213\",\"mBody\":{\"bType\":\"NoiseData\",\"cell\":88,\"datas\":[{\"noiseVal\":1.83666,\"noiseFreq\":2040,\"uptime\":\"20200820125700\"}],\"logTime\":\"20200820123131\"},\"ts\":1556184691451}"; // temp="{\"mType\":\"Data\",\"devType\":\"Noise\",\"devCode\":\"212020000a\",\"mBody\":{\"cell\":99,\"datas\":[{\"noiseVal\":84.7926,\"noiseFreq\":1421,\"uptime\":\"20191017141000\"},{\"noiseVal\":48.1797,\"noiseFreq\":212,\"uptime\":\"20190822141300\"},{\"noiseVal\":33.1398,\"noiseFreq\":553,\"uptime\":\"20190822141600\"},{\"noiseVal\":0.85,\"noiseFreq\":1049,\"uptime\":\"20190822141900\"},{\"noiseVal\":4000.86,\"noiseFreq\":1545,\"uptime\":\"20191017142200\"}],\"logTime\":\"20190822141337\",\"bType\":\"NoiseData\"},\"ts\":1566454417130}"; // temp="{\"mType\":\"Data\",\"devType\":\"TempPressure\",\"devCode\":\"812019010001\",\"mBody\":{\"cell\":82,\"datas\":[{\"temperature\":12.7999992,\"pressure\":104.66,\"uptime\":\"20210401155800\"}],\"logTime\":\"20210401000148\",\"bType\":\"TempPressureData\"},\"ts\":1617206508232}"; // List<String> arr= new ArrayList<String>(); // temp="{\"mType\":\"Data\",\"devType\":\"Tube\",\"devCode\":\"1111111111\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":26.8701172,\"uptime\":\"20210813115400\"}],\"logTime\":\"20210813105400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; // arr.add(temp); // temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"3123421342314\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"gas\":10,\"uptime\":\"20191017105400\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":50.8701172,\"uptime\":\"20210813105700\"}],\"logTime\":\"20210813105400\",\"bType\":\"LGData\"},\"ts\":1571292084960}"; // arr.add(temp); // temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"32201900001\",\"mBody\":{\"cell\":95,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20191017085400\"},{\"gas\":0.080566406,\"uptime\":\"20191017095400\"},{\"liquid\":10,\"uptime\":\"20210813105800\"},{\"gas\":0.09990235,\"uptime\":\"20191017115400\"},{\"gas\":0.070898436,\"uptime\":\"20191017125400\"},{\"gas\":10.8701172,\"uptime\":\"20210813105700\"}],\"logTime\":\"20210813105400\",\"bType\":\"LGData\"},\"ts\":1571292084960}"; // arr.add(temp); // temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322019010222\",\"mBody\":{\"cell\":92,\"pci\":60,\"rsrp\":-87,\"snr\":14,\"datas\":[{\"gas\":0.0,\"uptime\":\"20210819000900\"}],\"logTime\":\"20210819001029\",\"bType\":\"LGData\"},\"ts\":1629303029014}"; // arr.add(temp); temp="{\"mType\":\"Data\",\"devType\":\"Tube\",\"devCode\":\"342020030014\",\"mBody\":{\"cell\":15,\"pci\":94,\"rsrp\":-103,\"snr\":7,\"datas\":[{\"gas\":0.0,\"uptime\":\"20211128000000\"}],\"logTime\":\"20211210174648\",\"bType\":\"TubeData\"},\"ts\":1639129608475}"; temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322021050010\",\"mBody\":{\"cell\":91,\"pci\":279,\"rsrp\":-105,\"snr\":10,\"datas\":[{\"gas\":40.0,\"uptime\":\"20220113175000\"},{\"liquid\":1040.07336,\"uptime\":\"20220113175000\"}],\"logTime\":\"20220113175144\",\"bType\":\"LGData\"},\"ts\":1642067504321}"; temp="{\"mType\":\"Data\",\"devType\":\"LG\",\"devCode\":\"322021050019\",\"mBody\":{\"cell\":83,\"pci\":100,\"rsrp\":-85,\"snr\":24,\"datas\":[{\"gas\":0.599999964,\"uptime\":\"20220513020200\"},{\"gas\":0.599999964,\"uptime\":\"20220513030200\"},{\"liquid\":1108.392,\"uptime\":\"20220513020200\"},{\"liquid\":1112.47058,\"uptime\":\"20220513030200\"}],\"logTime\":\"20220513030400\",\"bType\":\"LGData\"},\"ts\":1652382240656}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); // for(String dd:arr){ // resp.process(dd); // } resp.process(temp); // DeviceManager deviceManager = ac.getBean(DeviceManager.class); // deviceManager.updateDeviceStatus(); } }