diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/src/main/java/org/well/well/kafka/KafkaConsumer.java b/src/main/java/org/well/well/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..3f8cbcf
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaConsumer.java
@@ -0,0 +1,12 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+public class KafkaConsumer implements MessageListener {
+
+ @Override
+ public void onMessage(ConsumerRecord record) {
+ String value = record.value();
+ }
+}
diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/src/main/java/org/well/well/kafka/KafkaConsumer.java b/src/main/java/org/well/well/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..3f8cbcf
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaConsumer.java
@@ -0,0 +1,12 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+public class KafkaConsumer implements MessageListener {
+
+ @Override
+ public void onMessage(ConsumerRecord record) {
+ String value = record.value();
+ }
+}
diff --git a/src/main/java/org/well/well/kafka/KafkaTool.java b/src/main/java/org/well/well/kafka/KafkaTool.java
new file mode 100644
index 0000000..92a274c
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaTool.java
@@ -0,0 +1,90 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaTool {
+
+ public static void getMessage() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "false");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+
+ public static void getMessage1() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test1");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor","sensor1"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("消费者1offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+ public static void sendMessage(String topic, String content) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "127.0.0.1:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Producer producer = new KafkaProducer<>(props);
+ producer.send(new ProducerRecord(topic, "22", content));
+ producer.close();
+ System.out.println("发送消息成功" + content);
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/src/main/java/org/well/well/kafka/KafkaConsumer.java b/src/main/java/org/well/well/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..3f8cbcf
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaConsumer.java
@@ -0,0 +1,12 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+public class KafkaConsumer implements MessageListener {
+
+ @Override
+ public void onMessage(ConsumerRecord record) {
+ String value = record.value();
+ }
+}
diff --git a/src/main/java/org/well/well/kafka/KafkaTool.java b/src/main/java/org/well/well/kafka/KafkaTool.java
new file mode 100644
index 0000000..92a274c
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaTool.java
@@ -0,0 +1,90 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaTool {
+
+ public static void getMessage() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "false");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+
+ public static void getMessage1() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test1");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor","sensor1"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("消费者1offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+ public static void sendMessage(String topic, String content) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "127.0.0.1:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Producer producer = new KafkaProducer<>(props);
+ producer.send(new ProducerRecord(topic, "22", content));
+ producer.close();
+ System.out.println("发送消息成功" + content);
+ }
+
+}
diff --git a/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
new file mode 100644
index 0000000..fbdc8ac
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
@@ -0,0 +1,28 @@
+package org.well.well.kafka;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.listener.MessageListener;
+
+public class TopicApiUseKafkaListener implements MessageListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class);
+
+ private static final String TABLE_PRE = "";
+
+ @Override
+ public void onMessage(ConsumerRecord stringStringConsumerRecord) {
+
+ String value = stringStringConsumerRecord.value();
+ JSONObject bodyJson = JSONObject.parseObject(value);
+
+ Integer platform = bodyJson.getInteger("platform");
+ String url = bodyJson.getString("url");
+ String apiVersion = bodyJson.getString("apiVersion");
+ String createDate = bodyJson.getString("createDate");
+
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/src/main/java/org/well/well/kafka/KafkaConsumer.java b/src/main/java/org/well/well/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..3f8cbcf
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaConsumer.java
@@ -0,0 +1,12 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+public class KafkaConsumer implements MessageListener {
+
+ @Override
+ public void onMessage(ConsumerRecord record) {
+ String value = record.value();
+ }
+}
diff --git a/src/main/java/org/well/well/kafka/KafkaTool.java b/src/main/java/org/well/well/kafka/KafkaTool.java
new file mode 100644
index 0000000..92a274c
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaTool.java
@@ -0,0 +1,90 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaTool {
+
+ public static void getMessage() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "false");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+
+ public static void getMessage1() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test1");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor","sensor1"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("消费者1offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+ public static void sendMessage(String topic, String content) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "127.0.0.1:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Producer producer = new KafkaProducer<>(props);
+ producer.send(new ProducerRecord(topic, "22", content));
+ producer.close();
+ System.out.println("发送消息成功" + content);
+ }
+
+}
diff --git a/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
new file mode 100644
index 0000000..fbdc8ac
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
@@ -0,0 +1,28 @@
+package org.well.well.kafka;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.listener.MessageListener;
+
+public class TopicApiUseKafkaListener implements MessageListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class);
+
+ private static final String TABLE_PRE = "";
+
+ @Override
+ public void onMessage(ConsumerRecord stringStringConsumerRecord) {
+
+ String value = stringStringConsumerRecord.value();
+ JSONObject bodyJson = JSONObject.parseObject(value);
+
+ Integer platform = bodyJson.getInteger("platform");
+ String url = bodyJson.getString("url");
+ String apiVersion = bodyJson.getString("apiVersion");
+ String createDate = bodyJson.getString("createDate");
+
+
+ }
+
+}
diff --git a/src/main/resources/wellSensor/application.properties b/src/main/resources/wellSensor/application.properties
index dee4d13..5446317 100644
--- a/src/main/resources/wellSensor/application.properties
+++ b/src/main/resources/wellSensor/application.properties
@@ -163,6 +163,14 @@
activemq_password =
+
+# ============================================================================
+# kafka配置
+# ============================================================================
+bootstrap.servers = 111.198.10.15:12502
+kafka.topic = test
+bootstrap.groupid =0
+
# ============================================================================
# 告警、工单推送地址
# ============================================================================
diff --git a/pom.xml b/pom.xml
index a806efa..82e3252 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,5 +262,19 @@
spring-jms
3.2.8.RELEASE
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.1
+
+
+ org.springframework.kafka
+ spring-kafka
+ 1.1.1.RELEASE
+
+
\ No newline at end of file
diff --git a/src/main/java/org/well/well/kafka/KafkaConsumer.java b/src/main/java/org/well/well/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..3f8cbcf
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaConsumer.java
@@ -0,0 +1,12 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+public class KafkaConsumer implements MessageListener {
+
+ @Override
+ public void onMessage(ConsumerRecord record) {
+ String value = record.value();
+ }
+}
diff --git a/src/main/java/org/well/well/kafka/KafkaTool.java b/src/main/java/org/well/well/kafka/KafkaTool.java
new file mode 100644
index 0000000..92a274c
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/KafkaTool.java
@@ -0,0 +1,90 @@
+package org.well.well.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaTool {
+
+ public static void getMessage() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "false");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+
+ public static void getMessage1() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test1");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ // 消费者订阅的topic, 可同时订阅多个
+// consumer.subscribe(Arrays.asList("first", "second", "third"));
+ consumer.subscribe(Arrays.asList("sensor","sensor1"));
+
+ while (true) {
+ // 读取数据,读取超时时间为100ms
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records)
+ System.out.printf("消费者1offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ consumer.commitSync();
+ }
+ }
+
+
+ public static void sendMessage(String topic, String content) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "127.0.0.1:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ Producer producer = new KafkaProducer<>(props);
+ producer.send(new ProducerRecord(topic, "22", content));
+ producer.close();
+ System.out.println("发送消息成功" + content);
+ }
+
+}
diff --git a/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
new file mode 100644
index 0000000..fbdc8ac
--- /dev/null
+++ b/src/main/java/org/well/well/kafka/TopicApiUseKafkaListener.java
@@ -0,0 +1,28 @@
+package org.well.well.kafka;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.listener.MessageListener;
+
+public class TopicApiUseKafkaListener implements MessageListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TopicApiUseKafkaListener.class);
+
+ private static final String TABLE_PRE = "";
+
+ @Override
+ public void onMessage(ConsumerRecord stringStringConsumerRecord) {
+
+ String value = stringStringConsumerRecord.value();
+ JSONObject bodyJson = JSONObject.parseObject(value);
+
+ Integer platform = bodyJson.getInteger("platform");
+ String url = bodyJson.getString("url");
+ String apiVersion = bodyJson.getString("apiVersion");
+ String createDate = bodyJson.getString("createDate");
+
+
+ }
+
+}
diff --git a/src/main/resources/wellSensor/application.properties b/src/main/resources/wellSensor/application.properties
index dee4d13..5446317 100644
--- a/src/main/resources/wellSensor/application.properties
+++ b/src/main/resources/wellSensor/application.properties
@@ -163,6 +163,14 @@
activemq_password =
+
+# ============================================================================
+# kafka配置
+# ============================================================================
+bootstrap.servers = 111.198.10.15:12502
+kafka.topic = test
+bootstrap.groupid =0
+
# ============================================================================
# 告警、工单推送地址
# ============================================================================
diff --git a/src/main/resources/wellSensor/applicationContex-kafka.xml b/src/main/resources/wellSensor/applicationContex-kafka.xml
new file mode 100644
index 0000000..f3430e4
--- /dev/null
+++ b/src/main/resources/wellSensor/applicationContex-kafka.xml
@@ -0,0 +1,53 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file