diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..eeb9622 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java @@ -0,0 +1,55 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +//import org.springframework.kafka.support.serializer.JsonSerializer; +// +//@Configuration +//@EnableKafka +//public class KafkaProducerConfig { +// +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.producer.retries}") +// private int retries; +// @Value("${spring.kafka.producer.batch-size}") +// private int batchSize; +// @Value("${spring.kafka.producer.properties.linger.ms}") +// private int linger; +// @Value("${spring.kafka.producer.buffer-memory}") +// private int bufferMemory; +// +// +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// props.put(ProducerConfig.RETRIES_CONFIG, retries); +// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); +// props.put(ProducerConfig.LINGER_MS_CONFIG, linger); +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return props; +// } +// +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..eeb9622 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java @@ -0,0 +1,55 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +//import org.springframework.kafka.support.serializer.JsonSerializer; +// +//@Configuration +//@EnableKafka +//public class KafkaProducerConfig { +// +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.producer.retries}") +// private int retries; +// @Value("${spring.kafka.producer.batch-size}") +// private int batchSize; +// @Value("${spring.kafka.producer.properties.linger.ms}") +// private int linger; +// @Value("${spring.kafka.producer.buffer-memory}") +// private int bufferMemory; +// +// +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// props.put(ProducerConfig.RETRIES_CONFIG, retries); +// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); +// props.put(ProducerConfig.LINGER_MS_CONFIG, linger); +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return props; +// } +// +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java new file mode 100644 index 0000000..16af70a --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java @@ -0,0 +1,18 @@ +//package com.casic.missiles.modular.system.kafka; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Qualifier; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class KafkaToolProducer { +// +// @Autowired +// @Qualifier("kafkaTemplate") +// private KafkaTemplate kafkaTemplate; +// +// public void send(String topic, String message) { +// kafkaTemplate.send(topic, message); +// } +//} +// diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..eeb9622 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java @@ -0,0 +1,55 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +//import org.springframework.kafka.support.serializer.JsonSerializer; +// +//@Configuration +//@EnableKafka +//public class KafkaProducerConfig { +// +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.producer.retries}") +// private int retries; +// @Value("${spring.kafka.producer.batch-size}") +// private int batchSize; +// @Value("${spring.kafka.producer.properties.linger.ms}") +// private int linger; +// @Value("${spring.kafka.producer.buffer-memory}") +// private int bufferMemory; +// +// +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// props.put(ProducerConfig.RETRIES_CONFIG, retries); +// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); +// props.put(ProducerConfig.LINGER_MS_CONFIG, linger); +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return props; +// } +// +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java new file mode 100644 index 0000000..16af70a --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java @@ -0,0 +1,18 @@ +//package com.casic.missiles.modular.system.kafka; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Qualifier; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class KafkaToolProducer { +// +// @Autowired +// @Qualifier("kafkaTemplate") +// private KafkaTemplate kafkaTemplate; +// +// public void send(String topic, String message) { +// kafkaTemplate.send(topic, message); +// } +//} +// diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java index 9ccbd57..0720b08 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java @@ -15,4 +15,6 @@ void processData(String recordValue); + String getLastVaule(String devCode); + } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..eeb9622 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java @@ -0,0 +1,55 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +//import org.springframework.kafka.support.serializer.JsonSerializer; +// +//@Configuration +//@EnableKafka +//public class KafkaProducerConfig { +// +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.producer.retries}") +// private int retries; +// @Value("${spring.kafka.producer.batch-size}") +// private int batchSize; +// @Value("${spring.kafka.producer.properties.linger.ms}") +// private int linger; +// @Value("${spring.kafka.producer.buffer-memory}") +// private int bufferMemory; +// +// +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// props.put(ProducerConfig.RETRIES_CONFIG, retries); +// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); +// props.put(ProducerConfig.LINGER_MS_CONFIG, linger); +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return props; +// } +// +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java new file mode 100644 index 0000000..16af70a --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java @@ -0,0 +1,18 @@ +//package com.casic.missiles.modular.system.kafka; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Qualifier; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class KafkaToolProducer { +// +// @Autowired +// @Qualifier("kafkaTemplate") +// private KafkaTemplate kafkaTemplate; +// +// public void send(String topic, String message) { +// kafkaTemplate.send(topic, message); +// } +//} +// diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java index 9ccbd57..0720b08 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java @@ -15,4 +15,6 @@ void processData(String recordValue); + String getLastVaule(String devCode); + } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java index f057982..0282bd0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java @@ -1,5 +1,6 @@ package com.casic.missiles.modular.system.service.impl; +import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -15,6 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Date; @@ -38,6 +40,10 @@ private IAlarmRecordsService alarmRecordsService; @Autowired private IBusWellInfoService busWellInfoService; + + @Autowired + private KafkaTemplate kafkaTemplate; + @Override public void processData(String recordValue) { JSONObject json = JSONObject.parseObject(recordValue); @@ -46,10 +52,10 @@ JSONObject jsonObject = (JSONObject) json.get("mBody"); WellRelationDto wellRelationDto = busWellInfoService.getWellRelationDto(devCode); if (mType[0].equals(json.get("mType")) || mType[3].equals(json.get("mType"))) {//存储上报数据 - String cell = null!=jsonObject.get("cell")?jsonObject.get("cell").toString():""; - String pci = null!=jsonObject.get("pci")?jsonObject.get("pci").toString():""; - String rsrp = null!=jsonObject.get("rsrp")?jsonObject.get("rsrp").toString():""; - String snr = null!= jsonObject.get("snr")?jsonObject.get("snr").toString():""; + String cell = null != jsonObject.get("cell") ? jsonObject.get("cell").toString() : ""; + String pci = null != jsonObject.get("pci") ? jsonObject.get("pci").toString() : ""; + String rsrp = null != jsonObject.get("rsrp") ? jsonObject.get("rsrp").toString() : ""; + String snr = null != jsonObject.get("snr") ? jsonObject.get("snr").toString() : ""; JSONArray jsonArray = (JSONArray) jsonObject.get("datas"); for (int i = 0; i < jsonArray.size(); i++) { try { @@ -65,12 +71,24 @@ //是否超阈值报警 Float thresh = alarmRuleService.getAlarmThresh(devCode); - if(null!=thresh&&Float.valueOf(val)>=thresh){ + if (null != thresh && Float.valueOf(val) >= thresh) { - alarmRecordsService.saveAlarmRecord(wellRelationDto!=null?wellRelationDto.getDeviceId():null, - devCode,wellRelationDto!=null?wellRelationDto.getWellCode():"", - "1", NoiseAlarmEnum.OVER_THRESH.getName(),val,thresh+"",upTime,"1", + alarmRecordsService.saveAlarmRecord(wellRelationDto != null ? wellRelationDto.getDeviceId() : null, + devCode, wellRelationDto != null ? wellRelationDto.getWellCode() : "", + "1", NoiseAlarmEnum.OVER_THRESH.getName(), val, thresh + "", upTime, "1", NoiseAlarmEnum.OVER_THRESH.getName()); + + //推送报警至第三方kafka + try { + String lastValue = getLastVaule(devCode); + if (ObjectUtil.isNotEmpty(lastValue)) { + if (Float.valueOf(lastValue) >= thresh) + sendKafka(devCode, val, upTime); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("推送第三方kafka失败!"); + } } } catch (Exception e) { e.printStackTrace(); @@ -79,4 +97,16 @@ } } } + + + @Override + public String getLastVaule(String devCode) { + return this.baseMapper.getLastVaule(devCode); + } + + public void sendKafka(String devCode, String val, String upTime) { + //构造报警消息 + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":" + devCode + ",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:" + val + "\"],\"logTime\":" + upTime + ",\"bType\":\"NoiseEvent\"},\"ts\":" + System.currentTimeMillis() + "}"; + kafkaTemplate.send("noise_data", temp); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java index 0ad366a..08761b0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/controller/NoiseConfigController.java @@ -1,33 +1,15 @@ package com.casic.missiles.modular.system.controller; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.casic.missiles.core.common.annotion.BussinessLog; -import com.casic.missiles.core.datascope.DataScope; -import com.casic.missiles.core.page.PageFactory; -import com.casic.missiles.model.log.LogObjectHolder; -import com.casic.missiles.model.response.ResponseData; -import com.casic.missiles.model.response.SuccessResponseData; -import com.casic.missiles.modular.system.dto.NoiseConfigRequest; -import com.casic.missiles.modular.system.dto.ReturnDTO; -import com.casic.missiles.modular.system.enums.BusinessExceptionEnum; -import com.casic.missiles.modular.system.exception.BusinessException; -import com.casic.missiles.modular.system.model.BusWellWell; -import com.casic.missiles.modular.system.service.IBusConfigService; -import com.casic.missiles.modular.system.service.IBusDeviceService; -import com.casic.missiles.modular.system.service.IBusWellWellService; -import com.casic.missiles.modular.system.util.ReturnUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataAccessException; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; -import java.util.List; -import java.util.Objects; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; /** *

@@ -42,6 +24,29 @@ @RequestMapping("/noiseConfig") public class NoiseConfigController { + @Autowired + private KafkaTemplate kafkaTemplate; + + + @ApiOperation("分页查询列表") + @PostMapping("/list") + public void list() { + + + String a= "2323232323"; + String b= "20"; + String c= "20240524153529"; + + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":"+a+",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:"+b+"\"],\"logTime\":"+c+",\"bType\":\"NoiseEvent\"},\"ts\":"+System.currentTimeMillis()+"}"; + + JSONObject aaa= JSON.parseObject(temp); + + + kafkaTemplate.send("noise_data", temp); + + +// kafkaProducer.send("noise_data", temp); + } // @Autowired // private IBusConfigService iBusConfigService; // diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java index 11f8317..c16bfe3 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/dao/NoiseMapper.java @@ -1,7 +1,12 @@ package com.casic.missiles.modular.system.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.casic.missiles.modular.system.dto.DeviceSelectDto; import com.casic.missiles.modular.system.model.Noise; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; /** *

@@ -12,5 +17,6 @@ * @since 2024-02-28 */ public interface NoiseMapper extends BaseMapper { - + @Select("SELECT DDATA FROM `data_noise` where DEVCODE= #{devCode} order by UPTIME desc limit 1") + String getLastVaule(@Param("devCode") String devCode); } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java index f5327cc..6e308bc 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumer.java @@ -22,5 +22,6 @@ String recordValue= record.value().toString(); //存库 iNoiseService.processData(recordValue); + } } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000..dc21bea --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaConsumerConfig.java @@ -0,0 +1,67 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.config.KafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// @Value("${spring.kafka.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.consumer.enable-auto-commit}") +// private boolean enableAutoCommit; +// @Value("${spring.kafka.consumer.properties.session.timeout.ms}") +// private String sessionTimeout; +// @Value("${spring.kafka.consumer.auto-commit-interval}") +// private String autoCommitInterval; +// @Value("${spring.kafka.consumer.group-id}") +// private String groupId; +// @Value("${spring.kafka.consumer.auto-offset-reset}") +// private String autoOffsetReset; +// @Value("${spring.kafka.consumer.concurrency}") +// private int concurrency; +// +// @Bean +// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setConcurrency(concurrency); +// factory.getContainerProperties().setPollTimeout(1500); +// return factory; +// } +// +// private ConsumerFactory consumerFactory() { +// return new DefaultKafkaConsumerFactory<>( +// consumerConfigs(), +// new StringDeserializer(), +// new StringDeserializer() +// ); +// } +// +// private Map consumerConfigs() { +// Map propsMap = new HashMap<>(); +// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); +// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); +// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); +// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); +// return propsMap; +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..eeb9622 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaProducerConfig.java @@ -0,0 +1,55 @@ +//package com.casic.missiles.modular.system.kafka; +// +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.kafka.clients.producer.ProducerConfig; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.apache.kafka.common.serialization.StringSerializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.core.DefaultKafkaProducerFactory; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.kafka.core.ProducerFactory; +//import org.springframework.kafka.support.serializer.JsonDeserializer; +//import org.springframework.kafka.support.serializer.JsonSerializer; +// +//@Configuration +//@EnableKafka +//public class KafkaProducerConfig { +// +// @Value("${spring.kafka.producer.bootstrap-servers}") +// private String servers; +// @Value("${spring.kafka.producer.retries}") +// private int retries; +// @Value("${spring.kafka.producer.batch-size}") +// private int batchSize; +// @Value("${spring.kafka.producer.properties.linger.ms}") +// private int linger; +// @Value("${spring.kafka.producer.buffer-memory}") +// private int bufferMemory; +// +// +// public Map producerConfigs() { +// Map props = new HashMap<>(); +// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); +// props.put(ProducerConfig.RETRIES_CONFIG, retries); +// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); +// props.put(ProducerConfig.LINGER_MS_CONFIG, linger); +// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); +// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +// return props; +// } +// +// public ProducerFactory producerFactory() { +// return new DefaultKafkaProducerFactory<>(producerConfigs()); +// } +// +// @Bean +// public KafkaTemplate kafkaTemplate() { +// return new KafkaTemplate<>(producerFactory()); +// } +//} \ No newline at end of file diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java new file mode 100644 index 0000000..16af70a --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/kafka/KafkaToolProducer.java @@ -0,0 +1,18 @@ +//package com.casic.missiles.modular.system.kafka; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.beans.factory.annotation.Qualifier; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class KafkaToolProducer { +// +// @Autowired +// @Qualifier("kafkaTemplate") +// private KafkaTemplate kafkaTemplate; +// +// public void send(String topic, String message) { +// kafkaTemplate.send(topic, message); +// } +//} +// diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java index 9ccbd57..0720b08 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/INoiseService.java @@ -15,4 +15,6 @@ void processData(String recordValue); + String getLastVaule(String devCode); + } diff --git a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java index f057982..0282bd0 100644 --- a/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java +++ b/casic-server/src/main/java/com/casic/missiles/modular/system/service/impl/NoiseServiceImpl.java @@ -1,5 +1,6 @@ package com.casic.missiles.modular.system.service.impl; +import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -15,6 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Date; @@ -38,6 +40,10 @@ private IAlarmRecordsService alarmRecordsService; @Autowired private IBusWellInfoService busWellInfoService; + + @Autowired + private KafkaTemplate kafkaTemplate; + @Override public void processData(String recordValue) { JSONObject json = JSONObject.parseObject(recordValue); @@ -46,10 +52,10 @@ JSONObject jsonObject = (JSONObject) json.get("mBody"); WellRelationDto wellRelationDto = busWellInfoService.getWellRelationDto(devCode); if (mType[0].equals(json.get("mType")) || mType[3].equals(json.get("mType"))) {//存储上报数据 - String cell = null!=jsonObject.get("cell")?jsonObject.get("cell").toString():""; - String pci = null!=jsonObject.get("pci")?jsonObject.get("pci").toString():""; - String rsrp = null!=jsonObject.get("rsrp")?jsonObject.get("rsrp").toString():""; - String snr = null!= jsonObject.get("snr")?jsonObject.get("snr").toString():""; + String cell = null != jsonObject.get("cell") ? jsonObject.get("cell").toString() : ""; + String pci = null != jsonObject.get("pci") ? jsonObject.get("pci").toString() : ""; + String rsrp = null != jsonObject.get("rsrp") ? jsonObject.get("rsrp").toString() : ""; + String snr = null != jsonObject.get("snr") ? jsonObject.get("snr").toString() : ""; JSONArray jsonArray = (JSONArray) jsonObject.get("datas"); for (int i = 0; i < jsonArray.size(); i++) { try { @@ -65,12 +71,24 @@ //是否超阈值报警 Float thresh = alarmRuleService.getAlarmThresh(devCode); - if(null!=thresh&&Float.valueOf(val)>=thresh){ + if (null != thresh && Float.valueOf(val) >= thresh) { - alarmRecordsService.saveAlarmRecord(wellRelationDto!=null?wellRelationDto.getDeviceId():null, - devCode,wellRelationDto!=null?wellRelationDto.getWellCode():"", - "1", NoiseAlarmEnum.OVER_THRESH.getName(),val,thresh+"",upTime,"1", + alarmRecordsService.saveAlarmRecord(wellRelationDto != null ? wellRelationDto.getDeviceId() : null, + devCode, wellRelationDto != null ? wellRelationDto.getWellCode() : "", + "1", NoiseAlarmEnum.OVER_THRESH.getName(), val, thresh + "", upTime, "1", NoiseAlarmEnum.OVER_THRESH.getName()); + + //推送报警至第三方kafka + try { + String lastValue = getLastVaule(devCode); + if (ObjectUtil.isNotEmpty(lastValue)) { + if (Float.valueOf(lastValue) >= thresh) + sendKafka(devCode, val, upTime); + } + } catch (Exception e) { + e.printStackTrace(); + logger.error("推送第三方kafka失败!"); + } } } catch (Exception e) { e.printStackTrace(); @@ -79,4 +97,16 @@ } } } + + + @Override + public String getLastVaule(String devCode) { + return this.baseMapper.getLastVaule(devCode); + } + + public void sendKafka(String devCode, String val, String upTime) { + //构造报警消息 + String temp = "{\"mType\":\"Event\",\"devType\":\"Noise\",\"devCode\":" + devCode + ",\"mBody\":{\"eventType\":[\"NoiseDataAlarm:" + val + "\"],\"logTime\":" + upTime + ",\"bType\":\"NoiseEvent\"},\"ts\":" + System.currentTimeMillis() + "}"; + kafkaTemplate.send("noise_data", temp); + } } diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index 7f86b29..a1e458d 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -11,12 +11,13 @@ bootstrap-servers: 111.198.10.15:12502 #这个是kafka的地址,对应你server.properties中配置的 producer: batch-size: 16384 #批量大小 - acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) + acks: all #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) retries: 3 # 消息发送重试次数 #transaction-id-prefix: transaction buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer + bootstrap-servers: 60.208.85.83:29092 properties: linger: ms: 2000 #提交延迟 @@ -32,6 +33,7 @@ # none:只要有一个分区不存在已提交的offset,就抛出异常; auto-offset-reset: latest max-poll-records: 500 #单次拉取消息的最大条数 + concurrency: 10 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: