diff --git a/casic-server/pom.xml b/casic-server/pom.xml
index 15cce6e..c4b6c82 100644
--- a/casic-server/pom.xml
+++ b/casic-server/pom.xml
@@ -36,6 +36,10 @@
provided
+
+ org.springframework.kafka
+ spring-kafka
+
diff --git a/casic-server/pom.xml b/casic-server/pom.xml
index 15cce6e..c4b6c82 100644
--- a/casic-server/pom.xml
+++ b/casic-server/pom.xml
@@ -36,6 +36,10 @@
provided
+
+ org.springframework.kafka
+ spring-kafka
+
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..f5327cc
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java
@@ -0,0 +1,26 @@
+package com.casic.missiles.modular.system.kafka;
+
+
+import com.casic.missiles.modular.system.service.INoiseService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class KafkaConsumer {
+
+
+ @Autowired
+ private INoiseService iNoiseService;
+
+ //监听消费
+ @KafkaListener(topics = {"noise"})
+ public void onNormalMessage(ConsumerRecord record) {
+ System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
+ record.value());
+ String recordValue= record.value().toString();
+ //存库
+ iNoiseService.processData(recordValue);
+ }
+}
diff --git a/casic-server/pom.xml b/casic-server/pom.xml
index 15cce6e..c4b6c82 100644
--- a/casic-server/pom.xml
+++ b/casic-server/pom.xml
@@ -36,6 +36,10 @@
provided
+
+ org.springframework.kafka
+ spring-kafka
+
diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..f5327cc
--- /dev/null
+++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java
@@ -0,0 +1,26 @@
+package com.casic.missiles.modular.system.kafka;
+
+
+import com.casic.missiles.modular.system.service.INoiseService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class KafkaConsumer {
+
+
+ @Autowired
+ private INoiseService iNoiseService;
+
+ //监听消费
+ @KafkaListener(topics = {"noise"})
+ public void onNormalMessage(ConsumerRecord record) {
+ System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
+ record.value());
+ String recordValue= record.value().toString();
+ //存库
+ iNoiseService.processData(recordValue);
+ }
+}
diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml
index 131cf1a..cb3c6ae 100644
--- a/casic-web/src/main/resources/config/application-dev.yml
+++ b/casic-web/src/main/resources/config/application-dev.yml
@@ -7,6 +7,42 @@
url: jdbc:mysql://111.198.10.15:11336/casic_correlator?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true
username: root
password: Casic203
+ kafka:
+ bootstrap-servers: 111.198.10.15:12502 #这个是kafka的地址,对应你server.properties中配置的
+ producer:
+ batch-size: 16384 #批量大小
+ acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
+ retries: 3 # 消息发送重试次数
+ #transaction-id-prefix: transaction
+ buffer-memory: 33554432
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ properties:
+ linger:
+ ms: 2000 #提交延迟
+ #partitioner: #指定分区器
+ #class: pers.zhang.config.CustomerPartitionHandler
+ consumer:
+ group-id: correlatorGroup #默认的消费组ID
+ enable-auto-commit: true #是否自动提交offset
+ auto-commit-interval: 2000 #提交offset延时
+ # 当kafka中没有初始offset或offset超出范围时将自动重置offset
+ # earliest:重置为分区中最小的offset;
+ # latest:重置为分区中最新的offset(消费分区中新产生的数据);
+ # none:只要有一个分区不存在已提交的offset,就抛出异常;
+ auto-offset-reset: latest
+ max-poll-records: 500 #单次拉取消息的最大条数
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ properties:
+ session:
+ timeout:
+ ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
+ request:
+ timeout:
+ ms: 18000 # 消费请求的超时时间
+ listener:
+ missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
jms:
pub-sub-domain: true
# session: