diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 15cce6e..c4b6c82 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -36,6 +36,10 @@ provided + + org.springframework.kafka + spring-kafka + diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 15cce6e..c4b6c82 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -36,6 +36,10 @@ provided + + org.springframework.kafka + spring-kafka + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java new file mode 100644 index 0000000..f5327cc --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -0,0 +1,26 @@ +package com.casic.missiles.modular.system.kafka; + + +import com.casic.missiles.modular.system.service.INoiseService; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class KafkaConsumer { + + + @Autowired + private INoiseService iNoiseService; + + //监听消费 + @KafkaListener(topics = {"noise"}) + public void onNormalMessage(ConsumerRecord record) { + System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + + record.value()); + String recordValue= record.value().toString(); + //存库 + iNoiseService.processData(recordValue); + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index 15cce6e..c4b6c82 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -36,6 +36,10 @@ provided + + org.springframework.kafka + spring-kafka + diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java new file mode 100644 index 0000000..f5327cc --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -0,0 +1,26 @@ +package com.casic.missiles.modular.system.kafka; + + +import com.casic.missiles.modular.system.service.INoiseService; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class KafkaConsumer { + + + @Autowired + private INoiseService iNoiseService; + + //监听消费 + @KafkaListener(topics = {"noise"}) + public void onNormalMessage(ConsumerRecord record) { + System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" + + record.value()); + String recordValue= record.value().toString(); + //存库 + iNoiseService.processData(recordValue); + } +} diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index 131cf1a..cb3c6ae 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -7,6 +7,42 @@ url: jdbc:mysql://111.198.10.15:11336/casic_correlator?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true username: root password: Casic203 + kafka: + bootstrap-servers: 111.198.10.15:12502 #这个是kafka的地址,对应你server.properties中配置的 + producer: + batch-size: 16384 #批量大小 + acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) + retries: 3 # 消息发送重试次数 + #transaction-id-prefix: transaction + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + linger: + ms: 2000 #提交延迟 + #partitioner: #指定分区器 + #class: pers.zhang.config.CustomerPartitionHandler + consumer: + group-id: correlatorGroup #默认的消费组ID + enable-auto-commit: true #是否自动提交offset + auto-commit-interval: 2000 #提交offset延时 + # 当kafka中没有初始offset或offset超出范围时将自动重置offset + # earliest:重置为分区中最小的offset; + # latest:重置为分区中最新的offset(消费分区中新产生的数据); + # none:只要有一个分区不存在已提交的offset,就抛出异常; + auto-offset-reset: latest + max-poll-records: 500 #单次拉取消息的最大条数 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session: + timeout: + ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作) + request: + timeout: + ms: 18000 # 消费请求的超时时间 + listener: + missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错 jms: pub-sub-domain: true # session: