diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/src/main/java/com/casic/kafka/util/SimplePartitioner.java b/src/main/java/com/casic/kafka/util/SimplePartitioner.java new file mode 100644 index 0000000..62bfa79 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SimplePartitioner.java @@ -0,0 +1,36 @@ +package com.casic.kafka.util; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + int partition = 0; + String partitionKey = (String) key; + int numPartitions = cluster.partitionsForTopic(topic).size(); + + try { + //指定分区逻辑,也就是key + partition = Integer.parseInt(partitionKey) % numPartitions; + } catch (NumberFormatException ne) { + //如果解析失败,都分配到0分区上 + partition = 0; + } + + return partition; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/src/main/java/com/casic/kafka/util/SimplePartitioner.java b/src/main/java/com/casic/kafka/util/SimplePartitioner.java new file mode 100644 index 0000000..62bfa79 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SimplePartitioner.java @@ -0,0 +1,36 @@ +package com.casic.kafka.util; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + int partition = 0; + String partitionKey = (String) key; + int numPartitions = cluster.partitionsForTopic(topic).size(); + + try { + //指定分区逻辑,也就是key + partition = Integer.parseInt(partitionKey) % numPartitions; + } catch (NumberFormatException ne) { + //如果解析失败,都分配到0分区上 + partition = 0; + } + + return partition; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/main/java/com/casic/kafka/util/SpringUtils.java b/src/main/java/com/casic/kafka/util/SpringUtils.java new file mode 100644 index 0000000..3fbbdf5 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SpringUtils.java @@ -0,0 +1,37 @@ +package com.casic.kafka.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +/** + * spring获取bean工具类 + * +* @author 作者 owen E-mail: 624191343@qq.com + * @version 创建时间:2018年3月20日 下午10:13:18 类说明 + * + */ +@Component +public class SpringUtils implements ApplicationContextAware { + + private static ApplicationContext applicationContext = null; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + public static T getBean(Class cla) { + return applicationContext.getBean(cla); + } + + public static T getBean(String name, Class cal) { + return applicationContext.getBean(name, cal); + } + + public static String getProperty(String key) { + return applicationContext.getBean(Environment.class).getProperty(key); + } +} diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/src/main/java/com/casic/kafka/util/SimplePartitioner.java b/src/main/java/com/casic/kafka/util/SimplePartitioner.java new file mode 100644 index 0000000..62bfa79 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SimplePartitioner.java @@ -0,0 +1,36 @@ +package com.casic.kafka.util; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + int partition = 0; + String partitionKey = (String) key; + int numPartitions = cluster.partitionsForTopic(topic).size(); + + try { + //指定分区逻辑,也就是key + partition = Integer.parseInt(partitionKey) % numPartitions; + } catch (NumberFormatException ne) { + //如果解析失败,都分配到0分区上 + partition = 0; + } + + return partition; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/main/java/com/casic/kafka/util/SpringUtils.java b/src/main/java/com/casic/kafka/util/SpringUtils.java new file mode 100644 index 0000000..3fbbdf5 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SpringUtils.java @@ -0,0 +1,37 @@ +package com.casic.kafka.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +/** + * spring获取bean工具类 + * +* @author 作者 owen E-mail: 624191343@qq.com + * @version 创建时间:2018年3月20日 下午10:13:18 类说明 + * + */ +@Component +public class SpringUtils implements ApplicationContextAware { + + private static ApplicationContext applicationContext = null; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + public static T getBean(Class cla) { + return applicationContext.getBean(cla); + } + + public static T getBean(String name, Class cal) { + return applicationContext.getBean(name, cal); + } + + public static String getProperty(String key) { + return applicationContext.getBean(Environment.class).getProperty(key); + } +} diff --git a/src/main/java/com/casic/service/impl/DayDataProvider.java b/src/main/java/com/casic/service/impl/DayDataProvider.java index 33d6588..db924e4 100644 --- a/src/main/java/com/casic/service/impl/DayDataProvider.java +++ b/src/main/java/com/casic/service/impl/DayDataProvider.java @@ -1,31 +1,29 @@ package com.casic.service.impl; import com.alibaba.druid.util.StringUtils; -import com.casic.config.DeviceTypeConfig; +import com.alibaba.fastjson.JSON; +import com.casic.config.KafkaTopicConfig; import com.casic.dao.*; import com.casic.entity.*; +import com.casic.kafka.Producer; import com.casic.service.DayDataService; -import com.casic.util.SnowBizPhyId; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.text.DecimalFormat; +import java.text.SimpleDateFormat; import java.util.*; @Service +@RequiredArgsConstructor public class DayDataProvider implements DayDataService { - @Resource - private DayDataDao dayDataDao; - @Resource - private WellLocalDataMapper wellLocalDataMapper; - @Resource - private WellInfoDao wellInfoDao; - @Resource - private LiquidDataMapper liquidDataMapper; - @Resource - private HarmfulDataMapper harmfulDataMapper; + private final DayDataDao dayDataDao; + private final WellLocalDataMapper wellLocalDataMapper; + private final WellInfoDao wellInfoDao; + private final LiquidDataMapper liquidDataMapper; + private final HarmfulDataMapper harmfulDataMapper; + private final KafkaTopicConfig kafkaTopicConfig; @Override public void wellLocalData() { @@ -84,13 +82,13 @@ String switchs = String.valueOf(devcodeList.get(i).get("switch")); String devcode = String.valueOf(devcodeList.get(i).get("DEVCODE")); String wellCode = String.valueOf(devcodeList.get(i).get("wellCode")); - harmfulDataCreator(devcode, wellCode,switchs); + harmfulDataCreator(devcode, wellCode, switchs); } } } - private void harmfulDataCreator(String devcode, String wellCode,String switchs) { + private void harmfulDataCreator(String devcode, String wellCode, String switchs) { DecimalFormat df = new DecimalFormat("0.00"); HarmfulData harmfulData = new HarmfulData(); harmfulData.setWellCode(wellCode); @@ -113,17 +111,30 @@ */ private void wellDataCreator(String devcode, String wellCode) { WellInfo wellInfo = new WellInfo(); -// wellInfo.setId(SnowBizPhyId.getBizPhyId()); +// wellInfo.setId(SnowBizPhyId.getBizPhyId()); wellInfo.setWellCode(wellCode); wellInfo.setDevcode(devcode); + List> mapList2 = new ArrayList<>(); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "-"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Status"); + dataMap.put("Value", "心跳"); + mapList2.add(cellDataMap); + mapList2.add(dataMap); wellInfo.setDescn("心跳"); wellInfo.setStatus("0"); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + wellInfo.setLogtime(new Date()); for (int i = 0; i < 6; i++) { wellInfoDao.insert(wellInfo); - initTime += random.nextInt(200000); + initTime -= random.nextInt(200000); wellInfo.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); + sendData(wellInfo.getLogtime(), devcode, mapList2); } } @@ -133,35 +144,57 @@ private void liquidDataCreator(String devcode, String wellCode, Float liquidValue) { LiquidData liquidData = new LiquidData(); DecimalFormat df = new DecimalFormat("0.00"); -// liquidData.setId(SnowBizPhyId.getBizPhyId()); +// liquidData.setId(SnowBizPhyId.getBizPhyId()); liquidData.setWellCode(wellCode); liquidData.setDevcode(devcode); liquidData.setCell("22"); - liquidData.setUptime(new Date()); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + liquidData.setUptime(new Date(initTime)); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "22"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Level"); for (int i = 0; i < 6; i++) { + List> mapList2 = new ArrayList<>(); Double errorData = Math.random() * 0.2 - 0.1; liquidData.setLiquiddata(df.format(liquidValue + errorData)); - initTime += random.nextInt(200000); + dataMap.put("Value", liquidData.getLiquiddata()); + mapList2.add(dataMap); + mapList2.add(cellDataMap); + initTime -= random.nextInt(200000); liquidData.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); liquidDataMapper.insert(liquidData); + sendData(liquidData.getLogtime(), devcode, mapList2); } } + private void sendData(Date logtime, String devcode, List> mapList2) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String logTime = sdf.format(logtime); + if (!StringUtils.isEmpty(kafkaTopicConfig.getKafkaSendTopic())) { + StandardData standardData = new StandardData(devcode, "LiquidLevel", JSON.toJSONString(mapList2), logTime); + Producer.send(JSON.toJSONString(standardData), kafkaTopicConfig.getKafkaSendTopic()); + } + } + + /** * 4天发六次 */ private void CreateWellDataLocal(String devcode, String wellCode) { WellLocalData wellLocalData = new WellLocalData(); DecimalFormat df = new DecimalFormat("0.000000"); -// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); +// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); wellLocalData.setWellCode(wellCode); wellLocalData.setDevcode(devcode); Double lng = 116.54 + Math.random() * 0.02 - 0.01; Double lat = 39.79 + Math.random() * 0.02 - 0.01; wellLocalData.setLng(df.format(lng)); wellLocalData.setLat(df.format(lat)); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); for (int i = 0; i < 6; i++) { initTime += random.nextInt(200000); wellLocalData.setLogtime(new Date(initTime)); diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/src/main/java/com/casic/kafka/util/SimplePartitioner.java b/src/main/java/com/casic/kafka/util/SimplePartitioner.java new file mode 100644 index 0000000..62bfa79 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SimplePartitioner.java @@ -0,0 +1,36 @@ +package com.casic.kafka.util; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + int partition = 0; + String partitionKey = (String) key; + int numPartitions = cluster.partitionsForTopic(topic).size(); + + try { + //指定分区逻辑,也就是key + partition = Integer.parseInt(partitionKey) % numPartitions; + } catch (NumberFormatException ne) { + //如果解析失败,都分配到0分区上 + partition = 0; + } + + return partition; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/main/java/com/casic/kafka/util/SpringUtils.java b/src/main/java/com/casic/kafka/util/SpringUtils.java new file mode 100644 index 0000000..3fbbdf5 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SpringUtils.java @@ -0,0 +1,37 @@ +package com.casic.kafka.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +/** + * spring获取bean工具类 + * +* @author 作者 owen E-mail: 624191343@qq.com + * @version 创建时间:2018年3月20日 下午10:13:18 类说明 + * + */ +@Component +public class SpringUtils implements ApplicationContextAware { + + private static ApplicationContext applicationContext = null; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + public static T getBean(Class cla) { + return applicationContext.getBean(cla); + } + + public static T getBean(String name, Class cal) { + return applicationContext.getBean(name, cal); + } + + public static String getProperty(String key) { + return applicationContext.getBean(Environment.class).getProperty(key); + } +} diff --git a/src/main/java/com/casic/service/impl/DayDataProvider.java b/src/main/java/com/casic/service/impl/DayDataProvider.java index 33d6588..db924e4 100644 --- a/src/main/java/com/casic/service/impl/DayDataProvider.java +++ b/src/main/java/com/casic/service/impl/DayDataProvider.java @@ -1,31 +1,29 @@ package com.casic.service.impl; import com.alibaba.druid.util.StringUtils; -import com.casic.config.DeviceTypeConfig; +import com.alibaba.fastjson.JSON; +import com.casic.config.KafkaTopicConfig; import com.casic.dao.*; import com.casic.entity.*; +import com.casic.kafka.Producer; import com.casic.service.DayDataService; -import com.casic.util.SnowBizPhyId; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.text.DecimalFormat; +import java.text.SimpleDateFormat; import java.util.*; @Service +@RequiredArgsConstructor public class DayDataProvider implements DayDataService { - @Resource - private DayDataDao dayDataDao; - @Resource - private WellLocalDataMapper wellLocalDataMapper; - @Resource - private WellInfoDao wellInfoDao; - @Resource - private LiquidDataMapper liquidDataMapper; - @Resource - private HarmfulDataMapper harmfulDataMapper; + private final DayDataDao dayDataDao; + private final WellLocalDataMapper wellLocalDataMapper; + private final WellInfoDao wellInfoDao; + private final LiquidDataMapper liquidDataMapper; + private final HarmfulDataMapper harmfulDataMapper; + private final KafkaTopicConfig kafkaTopicConfig; @Override public void wellLocalData() { @@ -84,13 +82,13 @@ String switchs = String.valueOf(devcodeList.get(i).get("switch")); String devcode = String.valueOf(devcodeList.get(i).get("DEVCODE")); String wellCode = String.valueOf(devcodeList.get(i).get("wellCode")); - harmfulDataCreator(devcode, wellCode,switchs); + harmfulDataCreator(devcode, wellCode, switchs); } } } - private void harmfulDataCreator(String devcode, String wellCode,String switchs) { + private void harmfulDataCreator(String devcode, String wellCode, String switchs) { DecimalFormat df = new DecimalFormat("0.00"); HarmfulData harmfulData = new HarmfulData(); harmfulData.setWellCode(wellCode); @@ -113,17 +111,30 @@ */ private void wellDataCreator(String devcode, String wellCode) { WellInfo wellInfo = new WellInfo(); -// wellInfo.setId(SnowBizPhyId.getBizPhyId()); +// wellInfo.setId(SnowBizPhyId.getBizPhyId()); wellInfo.setWellCode(wellCode); wellInfo.setDevcode(devcode); + List> mapList2 = new ArrayList<>(); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "-"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Status"); + dataMap.put("Value", "心跳"); + mapList2.add(cellDataMap); + mapList2.add(dataMap); wellInfo.setDescn("心跳"); wellInfo.setStatus("0"); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + wellInfo.setLogtime(new Date()); for (int i = 0; i < 6; i++) { wellInfoDao.insert(wellInfo); - initTime += random.nextInt(200000); + initTime -= random.nextInt(200000); wellInfo.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); + sendData(wellInfo.getLogtime(), devcode, mapList2); } } @@ -133,35 +144,57 @@ private void liquidDataCreator(String devcode, String wellCode, Float liquidValue) { LiquidData liquidData = new LiquidData(); DecimalFormat df = new DecimalFormat("0.00"); -// liquidData.setId(SnowBizPhyId.getBizPhyId()); +// liquidData.setId(SnowBizPhyId.getBizPhyId()); liquidData.setWellCode(wellCode); liquidData.setDevcode(devcode); liquidData.setCell("22"); - liquidData.setUptime(new Date()); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + liquidData.setUptime(new Date(initTime)); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "22"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Level"); for (int i = 0; i < 6; i++) { + List> mapList2 = new ArrayList<>(); Double errorData = Math.random() * 0.2 - 0.1; liquidData.setLiquiddata(df.format(liquidValue + errorData)); - initTime += random.nextInt(200000); + dataMap.put("Value", liquidData.getLiquiddata()); + mapList2.add(dataMap); + mapList2.add(cellDataMap); + initTime -= random.nextInt(200000); liquidData.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); liquidDataMapper.insert(liquidData); + sendData(liquidData.getLogtime(), devcode, mapList2); } } + private void sendData(Date logtime, String devcode, List> mapList2) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String logTime = sdf.format(logtime); + if (!StringUtils.isEmpty(kafkaTopicConfig.getKafkaSendTopic())) { + StandardData standardData = new StandardData(devcode, "LiquidLevel", JSON.toJSONString(mapList2), logTime); + Producer.send(JSON.toJSONString(standardData), kafkaTopicConfig.getKafkaSendTopic()); + } + } + + /** * 4天发六次 */ private void CreateWellDataLocal(String devcode, String wellCode) { WellLocalData wellLocalData = new WellLocalData(); DecimalFormat df = new DecimalFormat("0.000000"); -// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); +// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); wellLocalData.setWellCode(wellCode); wellLocalData.setDevcode(devcode); Double lng = 116.54 + Math.random() * 0.02 - 0.01; Double lat = 39.79 + Math.random() * 0.02 - 0.01; wellLocalData.setLng(df.format(lng)); wellLocalData.setLat(df.format(lat)); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); for (int i = 0; i < 6; i++) { initTime += random.nextInt(200000); wellLocalData.setLogtime(new Date(initTime)); diff --git a/src/main/resources/config/application.yml b/src/main/resources/config/application.yml index 079d5b3..b199a3a 100644 --- a/src/main/resources/config/application.yml +++ b/src/main/resources/config/application.yml @@ -25,18 +25,17 @@ flowable: checkProcessDefinitions: false #不校验process文件 casic: + alarm: + topic: MSGQUEUE_8287 #监测数据kafka数据发布主题 + data: + topic: TEMPSTORE_8204 #告警消息kafka数据发布主题 + kafka-Kerb-Url: D:\casic203\software\software\kafka-kaina\ device: - redis: - invalid-time: 86400 - host: 127.0.0.1 - port: 6379 - password: ew5T4K3#203lwh - config-prefix: 'Casic:' hour-types: 8 day-types: 1,2,7 task: cron: '0 0/1 * * * ? ' interval: - one-day: '0 0/1 * * * ? ' - three-day: '0 0 0 */3 * ? ' + one-day: '0 0/1 0 */1 * ? ' + three-day: '0 0/10 * * * ? ' four-day: '0 0 0 */4 * ? ' \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6d8a31d..21ec64a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,7 @@ UTF-8 UTF-8 1.8 + 2.4.0-hw-ei-312005 @@ -25,12 +26,6 @@ org.springframework.boot - spring-boot-starter-tomcat - 2.4.5 - - - - org.springframework.boot spring-boot-starter-web 2.4.5 @@ -56,18 +51,6 @@ - org.apache.httpcomponents - httpclient - 4.5.9 - - - - org.apache.httpcomponents - httpcore - 4.4.8 - - - org.apache.commons commons-lang3 3.1 @@ -79,11 +62,6 @@ 2.3.0 - - org.postgresql - postgresql - 42.2.19 - org.projectlombok @@ -105,19 +83,11 @@ - redis.clients - jedis - 3.1.0 - jar - - - org.springframework.data spring-data-redis 2.4.8 - com.oracle.database.jdbc ojdbc6 @@ -130,8 +100,71 @@ 1.0.0 + + org.apache.kafka + kafka-clients + ${kafka.version} + + + xml-apis + xml-apis + + + + + + xml-apis + xml-apis + 1.4.01 + + + + org.apache.kafka + kafka-streams + ${kafka.version} + + + + org.apache.kafka + kafka-clients + + + org.slf4j + slf4j-api + + + org.apache.kafka + connect-json + + + + + + + bigdata + http://wlg1.artifactory.cd-cloud-artifact.tools.huawei.com/artifactory/cbu-maven-public/ + + + huaweicloudsdk + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + + central + Maven Central + https://repo1.maven.org/maven2/ + + + + diff --git a/src/main/java/com/casic/config/KafkaTopicConfig.java b/src/main/java/com/casic/config/KafkaTopicConfig.java new file mode 100644 index 0000000..293f173 --- /dev/null +++ b/src/main/java/com/casic/config/KafkaTopicConfig.java @@ -0,0 +1,19 @@ +package com.casic.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class KafkaTopicConfig { + + @Value("${casic.data.topic}") + private String kafkaSendTopic; + + @Value("${casic.alarm.topic}") + private String kafkaAlarmSendTopic; + + @Value("${casic.data.kafka-Kerb-Url}") + private String kafkaKerbUrl; +} diff --git a/src/main/java/com/casic/config/task/TaskConfigurer.java b/src/main/java/com/casic/config/task/TaskConfigurer.java index d126324..cb66285 100644 --- a/src/main/java/com/casic/config/task/TaskConfigurer.java +++ b/src/main/java/com/casic/config/task/TaskConfigurer.java @@ -22,10 +22,10 @@ @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - TriggerTask triggrtTask = new TriggerTask(wellLocalData(), - triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); - taskRegistrar.addTriggerTask(triggrtTask); - triggrtTask = new TriggerTask(wellCoverData(), +// TriggerTask triggrtTask = new TriggerTask(wellLocalData(), +// triggerContext -> new CronTrigger(timeConfig.getWellLocalTime()).nextExecutionTime(triggerContext)); +// taskRegistrar.addTriggerTask(triggrtTask); + TriggerTask triggrtTask = new TriggerTask(wellCoverData(), triggerContext -> new CronTrigger(timeConfig.getWellCoverTime()).nextExecutionTime(triggerContext)); taskRegistrar.addTriggerTask(triggrtTask); triggrtTask = new TriggerTask(liquidData(), diff --git a/src/main/java/com/casic/dao/DayDataDao.java b/src/main/java/com/casic/dao/DayDataDao.java index 11b5eaf..2674497 100644 --- a/src/main/java/com/casic/dao/DayDataDao.java +++ b/src/main/java/com/casic/dao/DayDataDao.java @@ -11,11 +11,16 @@ @Mapper public interface DayDataDao { - List> getWellCoverByType(); - List> getStandardLiquid(); - List> getLiquidByType(); - List> getWellLocalByType(); + List> getWellCoverByType(); - List> getHarmfulCode(); + int clearOnline(@Param("devcode") String devcode); + + List> getStandardLiquid(); + + List> getLiquidByType(); + + List> getWellLocalByType(); + + List> getHarmfulCode(); } diff --git a/src/main/java/com/casic/entity/StandardData.java b/src/main/java/com/casic/entity/StandardData.java new file mode 100644 index 0000000..fe7bdad --- /dev/null +++ b/src/main/java/com/casic/entity/StandardData.java @@ -0,0 +1,29 @@ +package com.casic.entity; + +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class StandardData { + + @JSONField(name="DevID") + private String DevID; + @JSONField(name="DevType") + private String DevType; + @JSONField(name="Provider") + private String Provider; + @JSONField(name="Status") + private String Status; + @JSONField(name="LogTime") + private String LogTime; + + public StandardData(String DevID, String DevType, String Status, String LogTime) { + this.DevID = DevID; + this.DevType = DevType; + this.Provider = "Provider-ChangFeng"; + this.Status = Status; + this.LogTime = LogTime; + } + +} diff --git a/src/main/java/com/casic/kafka/Producer.java b/src/main/java/com/casic/kafka/Producer.java new file mode 100644 index 0000000..7ef048f --- /dev/null +++ b/src/main/java/com/casic/kafka/Producer.java @@ -0,0 +1,52 @@ +package com.casic.kafka; + +import com.casic.kafka.util.KafkaProperties; +import com.casic.kafka.util.KafkaUtils; +import com.casic.kafka.util.LoginUtil; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class Producer { + + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private static KafkaProducer producer; + + static { + try{ + if (LoginUtil.isSecurityModel()) { + LoginUtil.securityPrepare(KafkaProperties.USER_PRINCIPAL, KafkaProperties.USER_KEYTAB_FILE); + } + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + }catch (IOException ex){ + + } + } + + public static void send(String content, String topic){ + LOG.debug("producer start."); + if (producer == null) { + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + Properties props = KafkaUtils.producerInitProperties(); + producer = new KafkaProducer(props); + } + ProducerRecord record = new ProducerRecord(topic, "", content); + try { + // 同步发送 + producer.send(record).get(); + LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } catch (InterruptedException ie) { + LOG.info("The InterruptedException occured : {}.", ie); + } catch (ExecutionException ee) { + LOG.info("The ExecutionException occured : {}.", ee); + } +// producer.close(); +// LOG.info("主题为" + topic + ":kafka发送数据内容-------" + content); + } +} diff --git a/src/main/java/com/casic/kafka/constant/TopicConstant.java b/src/main/java/com/casic/kafka/constant/TopicConstant.java new file mode 100644 index 0000000..cebd2b2 --- /dev/null +++ b/src/main/java/com/casic/kafka/constant/TopicConstant.java @@ -0,0 +1,8 @@ +package com.casic.kafka.constant; + +public class TopicConstant { + + public static final String DATA_TOPIC="dataTopic"; + public static final String ALARM_TOPIC="alarmTopic"; + +} diff --git a/src/main/java/com/casic/kafka/util/KafkaProperties.java b/src/main/java/com/casic/kafka/util/KafkaProperties.java new file mode 100644 index 0000000..dedf438 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaProperties.java @@ -0,0 +1,129 @@ +package com.casic.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String DATA_TOPIC = "TEMPSTORE_8204"; + public final static String ALARM_TOPIC = "MSGQUEUE_8287"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + public static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + public static final String USER_PRINCIPAL = "kafkauser"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private static final String filePath = "D:\\casic203\\software\\software\\data-creater\\kafka\\"; + private KafkaProperties() { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + try { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } catch (IOException e) { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() { + if (null == instance) { + instance = new KafkaProperties(); + } + return instance; + } + + /** + * 获取参数值 + * + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) { + String rtValue = null; + + if (null == key) { + LOG.error("key is null"); + } else { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * + * @param key + * @return + */ + private String getPropertiesValue(String key) { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/src/main/java/com/casic/kafka/util/KafkaUtils.java b/src/main/java/com/casic/kafka/util/KafkaUtils.java new file mode 100644 index 0000000..bbf6830 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/KafkaUtils.java @@ -0,0 +1,136 @@ +package com.casic.kafka.util; + + +import java.util.Properties; + +public class KafkaUtils { + + + // 一次请求的最大等待时间(Ms) + private final int waitTime = 1000; + + // Broker连接地址 + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "kafkauser"; + + + public static Properties consumerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + +// props.put(SASL_MECHANISM, "GSSAPI"); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + +// System.setProperty("java.security.auth.login.config","D:\\cz\\203\\file\\yizhuang\\src\\main\\resources\\77042.jaas.conf"); + + return props; + } + + public static Properties producerInitProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 + props.put(PARTITIONER_NAME, + kafkaProc.getValues(PARTITIONER_NAME, "com.casic.kafka.util.SimplePartitioner")); + + System.setProperty("java.security.auth.login.config","D:\\casic203\\software\\software\\data-creater\\kafka\\kafkaClient.jaas.conf"); + + return props; + } + + +} diff --git a/src/main/java/com/casic/kafka/util/LoginUtil.java b/src/main/java/com/casic/kafka/util/LoginUtil.java new file mode 100644 index 0000000..0cf3459 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/LoginUtil.java @@ -0,0 +1,250 @@ +package com.casic.kafka.util; + +import com.casic.config.KafkaTopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + + private static String filePath; + + static { + KafkaTopicConfig pcs = SpringUtils.getBean(KafkaTopicConfig.class); + filePath=pcs.getKafkaKerbUrl(); + } + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "D:\\casic203\\software\\software\\20200616\\yizhuang\\config\\kafkaClient"; + String jaasPath = filePath + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 +// deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } catch (IOException e) { + throw new IOException("Failed to create jaas.conf File"); + } finally { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) { + if (!jaasFile.delete()) { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + principal += "@HADOOP.COM"; + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + + String krbFilePath = filePath + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return true; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/src/main/java/com/casic/kafka/util/SimplePartitioner.java b/src/main/java/com/casic/kafka/util/SimplePartitioner.java new file mode 100644 index 0000000..62bfa79 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SimplePartitioner.java @@ -0,0 +1,36 @@ +package com.casic.kafka.util; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + int partition = 0; + String partitionKey = (String) key; + int numPartitions = cluster.partitionsForTopic(topic).size(); + + try { + //指定分区逻辑,也就是key + partition = Integer.parseInt(partitionKey) % numPartitions; + } catch (NumberFormatException ne) { + //如果解析失败,都分配到0分区上 + partition = 0; + } + + return partition; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/src/main/java/com/casic/kafka/util/SpringUtils.java b/src/main/java/com/casic/kafka/util/SpringUtils.java new file mode 100644 index 0000000..3fbbdf5 --- /dev/null +++ b/src/main/java/com/casic/kafka/util/SpringUtils.java @@ -0,0 +1,37 @@ +package com.casic.kafka.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +/** + * spring获取bean工具类 + * +* @author 作者 owen E-mail: 624191343@qq.com + * @version 创建时间:2018年3月20日 下午10:13:18 类说明 + * + */ +@Component +public class SpringUtils implements ApplicationContextAware { + + private static ApplicationContext applicationContext = null; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + public static T getBean(Class cla) { + return applicationContext.getBean(cla); + } + + public static T getBean(String name, Class cal) { + return applicationContext.getBean(name, cal); + } + + public static String getProperty(String key) { + return applicationContext.getBean(Environment.class).getProperty(key); + } +} diff --git a/src/main/java/com/casic/service/impl/DayDataProvider.java b/src/main/java/com/casic/service/impl/DayDataProvider.java index 33d6588..db924e4 100644 --- a/src/main/java/com/casic/service/impl/DayDataProvider.java +++ b/src/main/java/com/casic/service/impl/DayDataProvider.java @@ -1,31 +1,29 @@ package com.casic.service.impl; import com.alibaba.druid.util.StringUtils; -import com.casic.config.DeviceTypeConfig; +import com.alibaba.fastjson.JSON; +import com.casic.config.KafkaTopicConfig; import com.casic.dao.*; import com.casic.entity.*; +import com.casic.kafka.Producer; import com.casic.service.DayDataService; -import com.casic.util.SnowBizPhyId; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.text.DecimalFormat; +import java.text.SimpleDateFormat; import java.util.*; @Service +@RequiredArgsConstructor public class DayDataProvider implements DayDataService { - @Resource - private DayDataDao dayDataDao; - @Resource - private WellLocalDataMapper wellLocalDataMapper; - @Resource - private WellInfoDao wellInfoDao; - @Resource - private LiquidDataMapper liquidDataMapper; - @Resource - private HarmfulDataMapper harmfulDataMapper; + private final DayDataDao dayDataDao; + private final WellLocalDataMapper wellLocalDataMapper; + private final WellInfoDao wellInfoDao; + private final LiquidDataMapper liquidDataMapper; + private final HarmfulDataMapper harmfulDataMapper; + private final KafkaTopicConfig kafkaTopicConfig; @Override public void wellLocalData() { @@ -84,13 +82,13 @@ String switchs = String.valueOf(devcodeList.get(i).get("switch")); String devcode = String.valueOf(devcodeList.get(i).get("DEVCODE")); String wellCode = String.valueOf(devcodeList.get(i).get("wellCode")); - harmfulDataCreator(devcode, wellCode,switchs); + harmfulDataCreator(devcode, wellCode, switchs); } } } - private void harmfulDataCreator(String devcode, String wellCode,String switchs) { + private void harmfulDataCreator(String devcode, String wellCode, String switchs) { DecimalFormat df = new DecimalFormat("0.00"); HarmfulData harmfulData = new HarmfulData(); harmfulData.setWellCode(wellCode); @@ -113,17 +111,30 @@ */ private void wellDataCreator(String devcode, String wellCode) { WellInfo wellInfo = new WellInfo(); -// wellInfo.setId(SnowBizPhyId.getBizPhyId()); +// wellInfo.setId(SnowBizPhyId.getBizPhyId()); wellInfo.setWellCode(wellCode); wellInfo.setDevcode(devcode); + List> mapList2 = new ArrayList<>(); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "-"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Status"); + dataMap.put("Value", "心跳"); + mapList2.add(cellDataMap); + mapList2.add(dataMap); wellInfo.setDescn("心跳"); wellInfo.setStatus("0"); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + wellInfo.setLogtime(new Date()); for (int i = 0; i < 6; i++) { wellInfoDao.insert(wellInfo); - initTime += random.nextInt(200000); + initTime -= random.nextInt(200000); wellInfo.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); + sendData(wellInfo.getLogtime(), devcode, mapList2); } } @@ -133,35 +144,57 @@ private void liquidDataCreator(String devcode, String wellCode, Float liquidValue) { LiquidData liquidData = new LiquidData(); DecimalFormat df = new DecimalFormat("0.00"); -// liquidData.setId(SnowBizPhyId.getBizPhyId()); +// liquidData.setId(SnowBizPhyId.getBizPhyId()); liquidData.setWellCode(wellCode); liquidData.setDevcode(devcode); liquidData.setCell("22"); - liquidData.setUptime(new Date()); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); + initTime += random.nextInt(64800000); + liquidData.setUptime(new Date(initTime)); + Map cellDataMap = new HashMap<>(); + cellDataMap.put("Key", "Power"); + cellDataMap.put("Value", "22"); + Map dataMap = new HashMap<>(); + dataMap.put("Key", "Level"); for (int i = 0; i < 6; i++) { + List> mapList2 = new ArrayList<>(); Double errorData = Math.random() * 0.2 - 0.1; liquidData.setLiquiddata(df.format(liquidValue + errorData)); - initTime += random.nextInt(200000); + dataMap.put("Value", liquidData.getLiquiddata()); + mapList2.add(dataMap); + mapList2.add(cellDataMap); + initTime -= random.nextInt(200000); liquidData.setLogtime(new Date(initTime)); + dayDataDao.clearOnline(devcode); liquidDataMapper.insert(liquidData); + sendData(liquidData.getLogtime(), devcode, mapList2); } } + private void sendData(Date logtime, String devcode, List> mapList2) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String logTime = sdf.format(logtime); + if (!StringUtils.isEmpty(kafkaTopicConfig.getKafkaSendTopic())) { + StandardData standardData = new StandardData(devcode, "LiquidLevel", JSON.toJSONString(mapList2), logTime); + Producer.send(JSON.toJSONString(standardData), kafkaTopicConfig.getKafkaSendTopic()); + } + } + + /** * 4天发六次 */ private void CreateWellDataLocal(String devcode, String wellCode) { WellLocalData wellLocalData = new WellLocalData(); DecimalFormat df = new DecimalFormat("0.000000"); -// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); +// wellLocalData.setId(SnowBizPhyId.getBizPhyId()); wellLocalData.setWellCode(wellCode); wellLocalData.setDevcode(devcode); Double lng = 116.54 + Math.random() * 0.02 - 0.01; Double lat = 39.79 + Math.random() * 0.02 - 0.01; wellLocalData.setLng(df.format(lng)); wellLocalData.setLat(df.format(lat)); - Long initTime = new Date().getTime(); + long initTime = System.currentTimeMillis(); for (int i = 0; i < 6; i++) { initTime += random.nextInt(200000); wellLocalData.setLogtime(new Date(initTime)); diff --git a/src/main/resources/config/application.yml b/src/main/resources/config/application.yml index 079d5b3..b199a3a 100644 --- a/src/main/resources/config/application.yml +++ b/src/main/resources/config/application.yml @@ -25,18 +25,17 @@ flowable: checkProcessDefinitions: false #不校验process文件 casic: + alarm: + topic: MSGQUEUE_8287 #监测数据kafka数据发布主题 + data: + topic: TEMPSTORE_8204 #告警消息kafka数据发布主题 + kafka-Kerb-Url: D:\casic203\software\software\kafka-kaina\ device: - redis: - invalid-time: 86400 - host: 127.0.0.1 - port: 6379 - password: ew5T4K3#203lwh - config-prefix: 'Casic:' hour-types: 8 day-types: 1,2,7 task: cron: '0 0/1 * * * ? ' interval: - one-day: '0 0/1 * * * ? ' - three-day: '0 0 0 */3 * ? ' + one-day: '0 0/1 0 */1 * ? ' + three-day: '0 0/10 * * * ? ' four-day: '0 0 0 */4 * ? ' \ No newline at end of file diff --git a/src/main/resources/mapper/DayDataDao.xml b/src/main/resources/mapper/DayDataDao.xml index 15b2c51..0039750 100644 --- a/src/main/resources/mapper/DayDataDao.xml +++ b/src/main/resources/mapper/DayDataDao.xml @@ -31,6 +31,13 @@ ) + + + UPDATE bus_device + SET ONLINE_STATE=1 + WHERE VALID =1 AND DEVCODE=#{devcode} + +