diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/consumer.properties b/src/main/resources/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/consumer.properties b/src/main/resources/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/kafkaSecurityMode b/src/main/resources/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/consumer.properties b/src/main/resources/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/kafkaSecurityMode b/src/main/resources/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/krb5.conf b/src/main/resources/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/consumer.properties b/src/main/resources/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/kafkaSecurityMode b/src/main/resources/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/krb5.conf b/src/main/resources/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/src/main/resources/producer.properties b/src/main/resources/producer.properties deleted file mode 100644 index 5e6446a..0000000 --- a/src/main/resources/producer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -acks = 1 -bootstrap.servers = 192.168.65.16:21007,192.168.65.15:21007,192.168.65.14:21007 -sasl.kerberos.service.name = kafka diff --git a/pom.xml b/pom.xml index f88f517..faa8944 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,13 @@ + + + org.springframework.boot + spring-boot-starter + 2.4.5 + + io.netty @@ -164,6 +171,7 @@ slf4j-log4j12 1.7.30 + org.apache.kafka kafka-streams-examples @@ -183,6 +191,14 @@ + + + + org.projectlombok + lombok + 1.18.20 + + org.apache.kafka kafka-streams diff --git a/src/main/java/com/casic/yizhuang/Main.java b/src/main/java/com/casic/yizhuang/Main.java index 408006f..be42917 100644 --- a/src/main/java/com/casic/yizhuang/Main.java +++ b/src/main/java/com/casic/yizhuang/Main.java @@ -8,14 +8,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); + public static void main(String[] args) throws Exception { // 初始化日志路径 - String path=System.getProperty("user.dir"); - System.setProperty("log.base",path); + String path = System.getProperty("user.dir"); + System.setProperty("log.base", path); System.out.println(path); logger.info("Start scheduler"); @@ -28,12 +32,7 @@ System.out.println("Start server"); logger.info("Start server"); new Thread(new Server()).start(); - - System.out.println("Start Kafka Consume"); - logger.info("Start Kafka Consume"); - - Producer.send("怎么回事儿"); - new KafkaClient().kafkaDataConsumer(); + new Thread(new KafkaClient()).start(); } } diff --git a/src/main/java/com/casic/yizhuang/json/MBody.java b/src/main/java/com/casic/yizhuang/json/MBody.java index bdf1a79..46548d7 100644 --- a/src/main/java/com/casic/yizhuang/json/MBody.java +++ b/src/main/java/com/casic/yizhuang/json/MBody.java @@ -1,44 +1,15 @@ package com.casic.yizhuang.json; +import lombok.Data; + import java.util.List; +@Data public class MBody { private String bType; private Integer cell; private List datas; private String logTime; - - public String getbType() { - return bType; - } - - public void setbType(String bType) { - this.bType = bType; - } - - public String getLogTime() { - return logTime; - } - - public void setLogTime(String logTime) { - this.logTime = logTime; - } - - - public List getDatas() { - return datas; - } - - public void setDatas(List datas) { - this.datas = datas; - } - - public Integer getCell() { - return cell; - } - - public void setCell(Integer cell) { - this.cell = cell; - } + private Boolean kafkaDataFlag; } diff --git a/src/main/java/com/casic/yizhuang/json/Message.java b/src/main/java/com/casic/yizhuang/json/Message.java index 1582918..c1a1b9b 100644 --- a/src/main/java/com/casic/yizhuang/json/Message.java +++ b/src/main/java/com/casic/yizhuang/json/Message.java @@ -1,60 +1,16 @@ package com.casic.yizhuang.json; +import lombok.Data; + +@Data public class Message { private String mType; private String devType; + + private String devCode; private MBody mBody; - private boolean kafkaDataFlag; private Long ts; - //水质的暂时不用这个做标记位 - public boolean getKafkaDataFlag() { - return kafkaDataFlag; - } - - public void setKafkaDataFlag(boolean kafkaDataFlag) { - this.kafkaDataFlag = kafkaDataFlag; - } - - public String getMType() { - return mType; - } - - public void setMType(String mType) { - this.mType = mType; - } - - public String getDevType() { - return devType; - } - - public void setDevType(String devType) { - this.devType = devType; - } - - public String getDevCode() { - return devCode; - } - - public void setDevCode(String devCode) { - this.devCode = devCode; - } - - public Long getTs() { - return ts; - } - - public void setTs(Long ts) { - this.ts = ts; - } - - public MBody getMBody() { - return mBody; - } - - public void setMBody(MBody mBody) { - this.mBody = mBody; - } } \ No newline at end of file diff --git a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java index 8810368..41ec244 100644 --- a/src/main/java/com/casic/yizhuang/json/device/WasteGas.java +++ b/src/main/java/com/casic/yizhuang/json/device/WasteGas.java @@ -1,67 +1,20 @@ package com.casic.yizhuang.json.device; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +@Data public class WasteGas { - private Float h2s; - private Float co; - private Float o2; - private Float ch4; + private Float power; - private Boolean liquidSwitch; + private String liquidSwitch; private String uptime; - - public Float getH2S() { - return h2s; - } - - public void setH2S(Float h2s) { - this.h2s = h2s; - } - - public Float getCO() { - return co; - } - - public void setCO(Float co) { - this.co = co; - } - - public Float getO2() { - return o2; - } - - public void setO2(Float o2) { - this.o2 = o2; - } - - public Float getCH4() { - return ch4; - } - - public void setCH4(Float ch4) { - this.ch4 = ch4; - } - - public Float getPower() { - return power; - } - - public void setPower(Float power) { - this.power = power; - } - - public Boolean getLiquidSwitch() { - return liquidSwitch; - } - - public void setLiquidSwitch(Boolean liquidSwitch) { - this.liquidSwitch = liquidSwitch; - } - - public String getUptime() { - return uptime; - } - - public void setUptime(String uptime) { - this.uptime = uptime; - } + @JSONField(name="H2S") + private Float h2s; + @JSONField(name="CO") + private Float co; + @JSONField(name="O2") + private Float o2; + @JSONField(name="CH4") + private Float ch4; } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java index a50b3fc..7cb9d60 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaClient.java @@ -24,46 +24,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.time.Duration; import java.util.*; -public class KafkaClient { +public class KafkaClient implements Runnable { private static final Logger logger = LoggerFactory.getLogger(KafkaClient.class); private KafkaConsumer consumer; - public void kafkaDataConsumer() throws Exception { -// Properties props = new Properties(); -// props.put("bootstrap.servers", "10.10.4.109:21005,10.10.4.110:21005,10.10.4.111:21005"); -// props.put("group.id", "ConsumerXX"); -// props.put("enable.auto.commit", "true"); -// props.put("auto.commit.interval.ms", "1000"); -// props.put("key.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// props.put("value.deserializer", -// "org.apache.kafka.common.serialization.StringDeserializer"); -// KafkaConsumer consumer = new KafkaConsumer<>(props); -// consumer.subscribe(Arrays.asList("MSGQUEUE_8204")); + public void kafkaDataConsumer() { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 if (LoginUtil.isSecurityModel()) { - LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + try { + logger.info("Securitymode start."); + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } catch ( + IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured : {}.", e); + return; + } } + + + Properties props = KafkaUtils.consumerInitProperties(); consumer = new KafkaConsumer(props); // 订阅 - consumer.subscribe(Collections.singletonList(KafkaProperties.REVICE_DATA_TOPIC)); + consumer.subscribe(Collections.singletonList(KafkaProperties.SEND_DATA_TOPIC)); while (true) { - ConsumerRecords records = consumer.poll(100); + System.out.println("--1--1--1--"); + ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { String msg = record.value(); // msg 为取得的一条实时数据。消费这条数据,如输出到STDOUT - logger.info("kafka接收数据-----"+msg); + logger.info("kafka接收数据-----" + msg); // String msg = "{\"Status\":\"[{\\\"Value\\\":7.7,\\\"Key\\\":\\\"Temp\\\"},{\\\"Value\\\":99.9,\\\"Key\\\":\\\"Humi\\\"},{\\\"Value\\\":100,\\\"Key\\\":\\\"Power\\\"}]\",\"DevType\":\"AirTempHumi\",\"LogTime\":\"2020-03-16 08:47:13\",\"DevID\":\"79WGX7\",\"Provider\":\"KaiNa\"}"; - if (msg.contains("ChangFeng")|| !msg.contains("Status")) { - return; + if (msg.contains("ChangFeng") || !msg.contains("Status")) { + continue; } try { @@ -83,13 +86,6 @@ statusMap.put(status.getKey(), status.getValue()); } - Class wellInfoClass = WellInfo.class; - List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); - - String wellcode = ""; - if (!wellInfoList.isEmpty()) { - wellcode = wellInfoList.get(0).getWellcode(); - } String strJson = ""; MBody mBody = new MBody(); @@ -102,24 +98,25 @@ switch (message.getDevType()) { case "HarmfulGas"://有害气体 - mBody.setbType("WasteGasData"); + mBody.setBType("WasteGasData"); WasteGas wasteGas = new WasteGas(); - wasteGas.setH2S(statusMap.get("H2S")); - wasteGas.setCO(statusMap.get("CO")); + wasteGas.setH2s(statusMap.get("H2S")); + wasteGas.setCo(statusMap.get("CO")); wasteGas.setO2(statusMap.get("O2")); - wasteGas.setCH4(statusMap.get("CH4")); - wasteGas.setH2S(statusMap.get("H2S")); + wasteGas.setCh4(statusMap.get("CH4")); + wasteGas.setLiquidSwitch(statusMap.get("LiquidSwitch").toString()); wasteGas.setUptime(logTime); + wasteGas.setPower(statusMap.get("Power")); datas.add(wasteGas); - + mBody.setKafkaDataFlag(true); + mBody.setDatas(datas); m.setMType("Data"); m.setDevType("WasteGas"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -127,7 +124,7 @@ break; case "AirTempHumi"://温湿度 - mBody.setbType("TempHumiData"); + mBody.setBType("TempHumiData"); mBody.setCell(cell); TempHumi tempHumi = new TempHumi(); @@ -137,21 +134,20 @@ datas.add(tempHumi); mBody.setDatas(datas); - + mBody.setKafkaDataFlag(true); m.setMType("Data"); m.setDevType("TempHumi"); m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); new Client().send(strJson + "\r\n"); break; - case "ManholeCover": - mBody.setbType("WellData"); + case "ManholeCover": //井盖儿 + mBody.setBType("WellData"); Well well = new Well(); if (statusMap.get("Status") == 0) { @@ -160,7 +156,7 @@ } else { break; } - + mBody.setKafkaDataFlag(true); datas.add(well); mBody.setDatas(datas); @@ -169,7 +165,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -177,8 +172,8 @@ new Client().send(strJson + "\r\n"); break; - case "Location": - mBody.setbType("LocatorData"); + case "Location": //井盖定位检测仪 + mBody.setBType("LocatorData"); Locator locator = new Locator(); locator.setLongitude(statusMap.get("Lon")); @@ -193,7 +188,6 @@ m.setDevCode(devId); m.setMBody(mBody); m.setTs(0L); - m.setKafkaDataFlag(true); strJson = JSON.toJSONString(m); System.out.println(strJson); @@ -208,7 +202,12 @@ flowmeter.setFlowVelocity(statusMap.get("Speed")); flowmeter.setWaterLevel(statusMap.get("Level")); flowmeter.setTemperature(statusMap.get("Temp")); - + Class wellInfoClass = WellInfo.class; + List wellInfoList = new DAO<>().query(wellInfoClass, Common.SELECT_WELLCODE, devId); + String wellcode = ""; + if (!wellInfoList.isEmpty()) { + wellcode = wellInfoList.get(0).getWellcode(); + } new DAO<>().Update(Common.INSERT_FLOWMETER, devId, wellcode, flowmeter.getWaterLevel(), flowmeter.getFlowVelocity(), flowmeter.getTemperature(), flowmeter.getInstantFlow(), flowmeter.getTotalFlow(), logTime); break; @@ -228,6 +227,10 @@ } + @Override + public void run() { + this.kafkaDataConsumer(); + } } diff --git a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java index 2473897..69c42a8 100644 --- a/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java +++ b/src/main/java/com/casic/yizhuang/kafka/KafkaProperties.java @@ -14,7 +14,6 @@ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 public final static String SEND_DATA_TOPIC = "TEMPSTORE_8204"; - public final static String REVICE_DATA_TOPIC = "MSGQUEUE_8204"; public final static String ALARM_TOPIC = "MSGQUEUE_8287"; diff --git a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java index ed61e04..2841b1b 100644 --- a/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java +++ b/src/main/java/com/casic/yizhuang/quartz/ReadAccessJob.java @@ -9,6 +9,7 @@ import com.casic.yizhuang.model.WellInfo; import com.casic.yizhuang.mysql.DAO; import com.casic.yizhuang.util.Common; +import netscape.javascript.JSObject; import org.quartz.Job; import org.quartz.JobExecutionContext; diff --git a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java index be27f8d..b5b16a7 100644 --- a/src/main/java/com/casic/yizhuang/quartz/Scheduler.java +++ b/src/main/java/com/casic/yizhuang/quartz/Scheduler.java @@ -18,7 +18,9 @@ org.quartz.Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail jobDetail = JobBuilder.newJob(ReadAccessJob.class) .withIdentity("job", "group").build(); - Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger", "triggerGroup") + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger", "triggerGroup") .startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(5) diff --git a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java index 9d3fa9c..846a50a 100644 --- a/src/main/java/com/casic/yizhuang/util/KafkaUtils.java +++ b/src/main/java/com/casic/yizhuang/util/KafkaUtils.java @@ -69,7 +69,7 @@ // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id - props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));//这是默认的 // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 diff --git a/src/main/resources/77042.jaas.conf b/src/main/resources/77042.jaas.conf deleted file mode 100644 index a86b107..0000000 --- a/src/main/resources/77042.jaas.conf +++ /dev/null @@ -1,27 +0,0 @@ -StormClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -KafkaClient { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; -Client { -com.sun.security.auth.module.Krb5LoginModule required -useKeyTab=true -keyTab="D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\user.keytab" -principal="kafkauser@HADOOP.COM" -useTicketCache=false -storeKey=true -debug=true; -}; diff --git a/src/main/resources/consumer.properties b/src/main/resources/consumer.properties deleted file mode 100644 index 1451c84..0000000 --- a/src/main/resources/consumer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -group.id = example-group1 -auto.commit.interval.ms = 60000 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/kafkaSecurityMode b/src/main/resources/kafkaSecurityMode deleted file mode 100644 index ed59a5e..0000000 --- a/src/main/resources/kafkaSecurityMode +++ /dev/null @@ -1 +0,0 @@ -kafka.client.security.mode = yes diff --git a/src/main/resources/krb5.conf b/src/main/resources/krb5.conf deleted file mode 100644 index 003c6c7..0000000 --- a/src/main/resources/krb5.conf +++ /dev/null @@ -1,48 +0,0 @@ -[kdcdefaults] -kdc_ports = 192.168.65.19:21732 -kdc_tcp_ports = "" - -[libdefaults] -default_realm = HADOOP.COM -kdc_timeout = 2500 -clockskew = 300 -use_dns_lookup = 0 -udp_preference_limit = 1465 -max_retries = 5 -dns_lookup_kdc = false -dns_lookup_realm = false -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -allow_extend_version = false -default_ccache_name = FILE:/tmp//krb5cc_%{uid} - -[realms] -HADOOP.COM = { -kdc = 192.168.65.19:21732 -kdc = 192.168.65.18:21732 -admin_server = 192.168.65.19:21730 -admin_server = 192.168.65.18:21730 -kpasswd_server = 192.168.65.19:21731 -kpasswd_server = 192.168.65.18:21731 -kpasswd_port = 21731 -kadmind_port = 21730 -kadmind_listen = 192.168.65.19:21730 -kpasswd_listen = 192.168.65.19:21731 -renewable = false -forwardable = false -renew_lifetime = 0m -max_renewable_life = 30m -acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/kadm5.acl -dict_file = /opt/huawei/Bigdata/common/runtime/security/weakPasswdDic/weakPasswdForKdc.ini -key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.1.2.2/install/FusionInsight-kerberos-1.18/kerberos/var/krb5kdc/.k5.HADOOP.COM -} - -[domain_realm] -.hadoop.com = HADOOP.COM - -[logging] -kdc = SYSLOG:INFO:DAEMON -admin_server = SYSLOG:INFO:DAEMON -default = SYSLOG:NOTICE:DAEMON diff --git a/src/main/resources/producer.properties b/src/main/resources/producer.properties deleted file mode 100644 index 5e6446a..0000000 --- a/src/main/resources/producer.properties +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol = SASL_PLAINTEXT -kerberos.domain.name = hadoop.hadoop.com -acks = 1 -bootstrap.servers = 192.168.65.16:21007,192.168.65.15:21007,192.168.65.14:21007 -sasl.kerberos.service.name = kafka diff --git a/src/main/resources/user.keytab b/src/main/resources/user.keytab deleted file mode 100644 index a10b711..0000000 --- a/src/main/resources/user.keytab +++ /dev/null Binary files differ