diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/wellSensor/consumer.properties b/src/main/resources/wellSensor/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/wellSensor/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/wellSensor/consumer.properties b/src/main/resources/wellSensor/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/wellSensor/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/wellSensor/kafkaSecurityMode b/src/main/resources/wellSensor/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/wellSensor/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/wellSensor/consumer.properties b/src/main/resources/wellSensor/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/wellSensor/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/wellSensor/kafkaSecurityMode b/src/main/resources/wellSensor/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/wellSensor/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/wellSensor/krb5.conf b/src/main/resources/wellSensor/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/wellSensor/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/wellSensor/consumer.properties b/src/main/resources/wellSensor/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/wellSensor/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/wellSensor/kafkaSecurityMode b/src/main/resources/wellSensor/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/wellSensor/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/wellSensor/krb5.conf b/src/main/resources/wellSensor/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/wellSensor/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/src/main/resources/wellSensor/producer.properties b/src/main/resources/wellSensor/producer.properties deleted file mode 100644 index 5e6446a..0000000 --- a/src/main/resources/wellSensor/producer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -acks = 1 -bootstrap.servers = 192.168.65.16:21007,192.168.65.15:21007,192.168.65.14:21007 -sasl.kerberos.service.name = kafka diff --git a/pom.xml b/pom.xml index 3a7d7a4..742aff8 100644 --- a/pom.xml +++ b/pom.xml @@ -282,25 +282,25 @@ - org.apache.kafka - kafka-streams - ${kafka.version} - + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-clients - - - org.slf4j - slf4j-api - - - org.apache.kafka - connect-json - - - + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + diff --git a/src/main/java/org/well/mysql/sink/WellSink.java b/src/main/java/org/well/mysql/sink/WellSink.java index b363729..8c494b7 100644 --- a/src/main/java/org/well/mysql/sink/WellSink.java +++ b/src/main/java/org/well/mysql/sink/WellSink.java @@ -36,13 +36,15 @@ private Logger LOG = LoggerFactory.getLogger(WellSink.class); private int batchSize; private ClassPathXmlApplicationContext ac = null; + public WellSink() { LOG.info("wellMysqlSink start..."); } + public void configure(Context context) { String s[] = System.getProperty("java.class.path").split(";"); for (String string : s) { - System.out.println("**********************"+string+"************************"); + System.out.println("**********************" + string + "************************"); } ac = new ClassPathXmlApplicationContext( new String[]{"classpath:wellSensor/*.xml"}); @@ -122,7 +124,7 @@ // temp="{\"mType\":\"Event\",\"devType\":\"Concentrator\",\"devCode\":\"00003\",\"mBody\":{\"logTime\":\"20190605002024\",\"bType\":\"ConcentratorOnline\"},\"ts\":1559665224343}"; - temp="{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; + temp = "{\"mType\":\"Data\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"datas\":[{\"value\":\"00\",\"uptime\":\"20190605000000\"}],\"logTime\":\"201906010003002\",\"bType\":\"WellData\"},\"ts\":1559665802828}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"cell\":9.9,\"datas\":[{\"level\":1.1892,\"uptime\":\"20191213000000\"}],\"logTime\":\"20190502000125\",\"bType\":\"LiquidData\"},\"ts\":1556726485336}"; // temp="{\"mType\":\"Event\",\"devType\":\"Liquid\",\"devCode\":\"11201900001\",\"mBody\":{\"eventType\":[\"LiquidPressureError\"],\"logTime\":\"20190510134635\",\"bType\":\"LiquidEvent\"},\"ts\":1557467195358}"; // temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019010212\",\"mBody\":{\"eventType\":[\"WellLowBatteryAlarm\"],\"logTime\":\"20190624114710\",\"bType\":\"WellEvent\"},\"ts\":1560484030810}"; @@ -140,12 +142,14 @@ // temp="{\"mType\":\"SetResponse\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidConfigSuccess\"},\"ts\":1556182310514}"; // temp="{\"mType\":\"Data\",\"devType\":\"Liquid\",\"devCode\":\"12121212125\",\"mBody\":{\"bType\":\"LiquidData\",\"cell\":97,\"datas\":[{\"level\":4,\"uptime\":\"20191219150000\"},{\"level\":4,\"uptime\":\"20191219151000\"},{\"level\":6.5,\"uptime\":\"20191219152000\"}],\"logTime\":\"20191219152000\"},\"ts\":1556186030842}"; // temp="{\"mType\":\"Data\",\"devType\":\"NoiseDig\",\"devCode\":\"14141414146\",\"mBody\":{\"bType\":\"NoiseDigData\",\"cell\":88,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"noiseVal\":60,\"noiseFreq\":50,\"uptime\":\"20200109123131\"}],\"logTime\":\"20200119123131\"},\"ts\":1556184691451}"; - temp="{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20191017154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + // temp="{\"mType\":\"Data\",\"devType\":\"Methane\",\"devCode\":\"312020011111\",\"mBody\":{\"cell\":95,\"pci\":100,\"rsrp\":50,\"snr\":20,\"datas\":[{\"gas\":0.064453125,\"uptime\":\"20200407085400\"},{\"gas\":0.080566406,\"uptime\":\"20200407085400\"},{\"gas\":25,\"uptime\":\"20200407085400\"},{\"gas\":0.09990235,\"uptime\":\"20200407085400\"},{\"gas\":0.070898436,\"uptime\":\"20200407085400\"},{\"gas\":60,\"uptime\":\"20200408085400\"}],\"logTime\":\"20200407085400\",\"bType\":\"MethaneData\"},\"ts\":1571292084960}"; - -// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}"; - +// temp="{\"mType\":\"Data\",\"devType\":\"Locator\",\"devCode\":\"642019010387\",\"mBody\":{\"datas\":[{\"longitude\":0.0,\"latitude\":0.0,\"uptime\":\"20200118111000\"}],\"logTime\":\"20200118111006\",\"bType\":\"LocatorData\"},\"ts\":1579317006078}";; // temp="{\"Status\":\"[{\\\"Value\\\":7.8876,\\\"Key\\\":\\\"PH\\\"},{\\\"Value\\\":28.0265,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":0.1994,\\\"Key\\\":\\\"Turb\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"Cond\\\"},{\\\"Value\\\":0.5252,\\\"Key\\\":\\\"DO\\\"},{\\\"Value\\\":0,\\\"Key\\\":\\\"COD\\\"},{\\\"Value\\\":2746.4216,\\\"Key\\\":\\\"AN\\\"},{\\\"Value\\\":1.0002,\\\"Key\\\":\\\"TP\\\"},{\\\"Value\\\":1.4385,\\\"Key\\\":\\\"TN\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"devType\":\"WaterQuality\",\"LogTime\":\"2020-06-30 16:21:36\",\"DevID\":\"W1L30Z\",\"Provider\":\"KaiNa\"}"; + temp = "{\"devCode\":\"6K45QC\",\"devType\":\"WasteGas\",\"kafkaDataFlag\":true,\"mBody\":{\"bType\":\"WasteGasData\",\"datas\":[{\"CH4\":2.7788,\"CO\":0.0,\"H2S\":0.0,\"O2\":19.8548,\"liquidSwitch\":\"0.0\",\"power\":100.0,\"uptime\":\"20230529160657\"}],\"logTime\":\"20230529160657\"},\"mType\":\"Data\",\"ts\":0}"; +// temp = "{\"mType\":\"Event\",\"devType\":\"Well\",\"devCode\":\"412019122104\",\"mBody\":{\"eventType\":[\"WellOpenAlarm\"],\"logTime\":\"20230517154056\",\"bType\":\"WellEvent\"},\"ts\":1571298056241}"; + temp="{\"mType\":\"Data\",\"devType\":\"TempHumi\",\"devCode\":\"842019010610\",\"mBody\":{\"cell\":59,\"datas\":[{\"temperature\":25.1,\"uptime\":\"20230218060000\"}],\"logTime\":\"20230531084649\",\"bType\":\"TempHumiData\"},\"ts\":1685494009613}"; + temp="{\"devCode\":\"63CSS3\",\"devType\":\"Well\",\"mBody\":{\"bType\":\"WellData\",\"datas\":[{\"uptime\":\"20230531163422\",\"value\":\"00\"}],\"logTime\":\"20230531163422\"},\"mType\":\"Data\",\"ts\":0}"; AbstractResponse resp = ResponseResolver.makeResponse(temp); resp.setAc(ac); resp.process(temp); diff --git a/src/main/java/org/well/well/kafka/StandardDataUtils.java b/src/main/java/org/well/well/kafka/StandardDataUtils.java index dba65a8..a4e4b84 100644 --- a/src/main/java/org/well/well/kafka/StandardDataUtils.java +++ b/src/main/java/org/well/well/kafka/StandardDataUtils.java @@ -14,10 +14,12 @@ List> standardStatusFomateList = new ArrayList<>(); if (realParam != null) { for (int i = 0; i < realParam.length; i++) { - Map standardStatusMap = new HashMap<>(); - standardStatusMap.put("Key", standardkeyParm[i]); - standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); - standardStatusFomateList.add(standardStatusMap); + if(jsonObject.containsKey(realParam[i].toString())){ + Map standardStatusMap = new HashMap<>(); + standardStatusMap.put("Key", standardkeyParm[i]); + standardStatusMap.put("Value", jsonObject.get(realParam[i]).toString()); + standardStatusFomateList.add(standardStatusMap); + } } } if (!CollectionUtils.isEmpty(appenList)) { diff --git a/src/main/resources/wellSensor/77042.jaas.conf b/src/main/resources/wellSensor/77042.jaas.conf deleted file mode 100644 index 3abd31a..0000000 --- a/src/main/resources/wellSensor/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/wellSensor/consumer.properties b/src/main/resources/wellSensor/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/wellSensor/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/wellSensor/kafkaSecurityMode b/src/main/resources/wellSensor/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/wellSensor/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/wellSensor/krb5.conf b/src/main/resources/wellSensor/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/wellSensor/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/src/main/resources/wellSensor/producer.properties b/src/main/resources/wellSensor/producer.properties deleted file mode 100644 index 5e6446a..0000000 --- a/src/main/resources/wellSensor/producer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -acks = 1 -bootstrap.servers = 192.168.65.16:21007,192.168.65.15:21007,192.168.65.14:21007 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/wellSensor/user.keytab b/src/main/resources/wellSensor/user.keytab deleted file mode 100644 index a10b711..0000000 --- a/src/main/resources/wellSensor/user.keytab +++ /dev/null Binary files differ