diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java new file mode 100644 index 0000000..d4918eb --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java @@ -0,0 +1,73 @@ +package com.casic.missiles.parser.sender; + +import cn.hutool.core.date.DateUtil; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2024-03-26 + */ +public class SenderSupport { + + + /** + * 构建不同类型的数据结构 + * + * @param bizDataMapList 构建数据的初始化结果集 + */ + protected Map buildTypeDeviceData(List> bizDataMapList) { + Map contentMap = new HashMap(), mBody = new HashMap(); + Map bizDataMap = bizDataMapList.get(0); + switch ((Integer) bizDataMap.get("deviceType")) { + case 32: + //构建不同类型的下发配置事件 + contentMap.put("devType", "GasDetector"); + setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); + break; + case 31: + contentMap.put("devType", "Pressure"); + setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); + } + //构建值回复的类型 + if (bizDataMap.containsKey("dataValue")) { + contentMap.put("mType", "Data"); + if (bizDataMap.containsKey("cell")) { + mBody.put("cell", bizDataMap.get("cell")); + } + mBody.put("datas", bizDataMapList); + } + //构建三码上数的结构 + if (bizDataMap.containsKey("imei")) { + contentMap.put("mType", "StartupRequest"); + mBody.put("iccid", bizDataMap.get("iccid")); + mBody.put("imei", bizDataMap.get("imei")); + } + if (bizDataMap.containsKey("devcode")) { + contentMap.put("devCode", bizDataMap.get("devcode")); + } + contentMap.put("mBody", mBody); + return contentMap; + } + + /** + * 设置下发配置回复 + */ + private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess, String bTypeFail) { + if (bizDataMap.containsKey("config")) { + contentMap.put("mType", "SetResponse"); + contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); + if ("1".equals(bizDataMap.get("config"))) { + mBody.put("bType", bTypeSuccess); + } else { + mBody.put("bType", bTypeFail); + } + } + } + + +} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java new file mode 100644 index 0000000..d4918eb --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java @@ -0,0 +1,73 @@ +package com.casic.missiles.parser.sender; + +import cn.hutool.core.date.DateUtil; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2024-03-26 + */ +public class SenderSupport { + + + /** + * 构建不同类型的数据结构 + * + * @param bizDataMapList 构建数据的初始化结果集 + */ + protected Map buildTypeDeviceData(List> bizDataMapList) { + Map contentMap = new HashMap(), mBody = new HashMap(); + Map bizDataMap = bizDataMapList.get(0); + switch ((Integer) bizDataMap.get("deviceType")) { + case 32: + //构建不同类型的下发配置事件 + contentMap.put("devType", "GasDetector"); + setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); + break; + case 31: + contentMap.put("devType", "Pressure"); + setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); + } + //构建值回复的类型 + if (bizDataMap.containsKey("dataValue")) { + contentMap.put("mType", "Data"); + if (bizDataMap.containsKey("cell")) { + mBody.put("cell", bizDataMap.get("cell")); + } + mBody.put("datas", bizDataMapList); + } + //构建三码上数的结构 + if (bizDataMap.containsKey("imei")) { + contentMap.put("mType", "StartupRequest"); + mBody.put("iccid", bizDataMap.get("iccid")); + mBody.put("imei", bizDataMap.get("imei")); + } + if (bizDataMap.containsKey("devcode")) { + contentMap.put("devCode", bizDataMap.get("devcode")); + } + contentMap.put("mBody", mBody); + return contentMap; + } + + /** + * 设置下发配置回复 + */ + private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess, String bTypeFail) { + if (bizDataMap.containsKey("config")) { + contentMap.put("mType", "SetResponse"); + contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); + if ("1".equals(bizDataMap.get("config"))) { + mBody.put("bType", bTypeSuccess); + } else { + mBody.put("bType", bTypeFail); + } + } + } + + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java index 787874a..0b2667a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java @@ -1,8 +1,11 @@ package com.casic.missiles.parser.sender.impl; -import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; import com.casic.missiles.pojo.SubscribeDetailConfig; +import com.casic.missiles.util.SpringContextUtil; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; @@ -10,24 +13,36 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.producer.internals.Sender; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.io.IOException; import java.util.List; import java.util.Map; @Component("functionCallback") -public class FunctionCallback implements DataSubscribeProvider { +public class FunctionCallback extends SenderSupport implements DataSubscribeProvider { + @Override - public void publishDataSubscribe(List> bizDataMap, SubscribeDetailConfig subscribeDetailConfig) { - String url = subscribeDetailConfig.getAddress(); - String json = JSONArray.toJSONString(bizDataMap); - doPublishDataSubscribe(json, url); + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + //如果地址为空,则直接返回结果 + if (StringUtils.isEmpty(sensorhubSenderProperties.getUrl())) { + return; + } + Map contentMap = this.buildTypeDeviceData(bizDataMapList); + doPublishDataSubscribe(JSON.toJSONString(contentMap), sensorhubSenderProperties.getUrl()); } /** - * 通过post方法执行方法回调 + * 通过post方法执行方法回调 * * @param json * @param url diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java new file mode 100644 index 0000000..d4918eb --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java @@ -0,0 +1,73 @@ +package com.casic.missiles.parser.sender; + +import cn.hutool.core.date.DateUtil; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2024-03-26 + */ +public class SenderSupport { + + + /** + * 构建不同类型的数据结构 + * + * @param bizDataMapList 构建数据的初始化结果集 + */ + protected Map buildTypeDeviceData(List> bizDataMapList) { + Map contentMap = new HashMap(), mBody = new HashMap(); + Map bizDataMap = bizDataMapList.get(0); + switch ((Integer) bizDataMap.get("deviceType")) { + case 32: + //构建不同类型的下发配置事件 + contentMap.put("devType", "GasDetector"); + setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); + break; + case 31: + contentMap.put("devType", "Pressure"); + setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); + } + //构建值回复的类型 + if (bizDataMap.containsKey("dataValue")) { + contentMap.put("mType", "Data"); + if (bizDataMap.containsKey("cell")) { + mBody.put("cell", bizDataMap.get("cell")); + } + mBody.put("datas", bizDataMapList); + } + //构建三码上数的结构 + if (bizDataMap.containsKey("imei")) { + contentMap.put("mType", "StartupRequest"); + mBody.put("iccid", bizDataMap.get("iccid")); + mBody.put("imei", bizDataMap.get("imei")); + } + if (bizDataMap.containsKey("devcode")) { + contentMap.put("devCode", bizDataMap.get("devcode")); + } + contentMap.put("mBody", mBody); + return contentMap; + } + + /** + * 设置下发配置回复 + */ + private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess, String bTypeFail) { + if (bizDataMap.containsKey("config")) { + contentMap.put("mType", "SetResponse"); + contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); + if ("1".equals(bizDataMap.get("config"))) { + mBody.put("bType", bTypeSuccess); + } else { + mBody.put("bType", bTypeFail); + } + } + } + + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java index 787874a..0b2667a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java @@ -1,8 +1,11 @@ package com.casic.missiles.parser.sender.impl; -import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; import com.casic.missiles.pojo.SubscribeDetailConfig; +import com.casic.missiles.util.SpringContextUtil; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; @@ -10,24 +13,36 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.producer.internals.Sender; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.io.IOException; import java.util.List; import java.util.Map; @Component("functionCallback") -public class FunctionCallback implements DataSubscribeProvider { +public class FunctionCallback extends SenderSupport implements DataSubscribeProvider { + @Override - public void publishDataSubscribe(List> bizDataMap, SubscribeDetailConfig subscribeDetailConfig) { - String url = subscribeDetailConfig.getAddress(); - String json = JSONArray.toJSONString(bizDataMap); - doPublishDataSubscribe(json, url); + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + //如果地址为空,则直接返回结果 + if (StringUtils.isEmpty(sensorhubSenderProperties.getUrl())) { + return; + } + Map contentMap = this.buildTypeDeviceData(bizDataMapList); + doPublishDataSubscribe(JSON.toJSONString(contentMap), sensorhubSenderProperties.getUrl()); } /** - * 通过post方法执行方法回调 + * 通过post方法执行方法回调 * * @param json * @param url diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java index b95d1d8..288ed52 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java @@ -1,92 +1,44 @@ -//package com.casic.missiles.parser.sender.impl; -// -//import cn.hutool.core.date.DateUtil; -//import com.alibaba.fastjson.JSON; -//import com.casic.missiles.autoconfig.KafkaSendResultHandler; -//import com.casic.missiles.parser.sender.DataSubscribeProvider; -//import com.casic.missiles.pojo.SubscribeDetailConfig; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.stereotype.Component; -//import org.springframework.transaction.annotation.Transactional; -//import org.springframework.util.CollectionUtils; -// -//import java.util.Date; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -///** -// * @author cz -// * @date 2023-11-10 -// */ -//@Component("kafka") -//public class KafkaSubscribe implements DataSubscribeProvider { -// -// private KafkaTemplate kafkaTemplate; -// -// public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { -// this.kafkaTemplate = kafkaTemplate; -// //回调方法、异常处理 -// this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); -// } -// -// @Override -// @Transactional -// public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { -// if (CollectionUtils.isEmpty(bizDataMapList)) { -// return; -// } -// Map contentMap = new HashMap(), mBody = new HashMap(); -// Map bizDataMap = bizDataMapList.get(0); -// switch ((Integer) bizDataMap.get("deviceType")) { -// case 32: -// contentMap.put("devType", "GasDetector"); -// setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); -// break; -// case 31: -// contentMap.put("devType", "Pressure"); -// setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); -// } -// -// if (bizDataMap.containsKey("dataValue")) { -// contentMap.put("mType", "Data"); -// if (bizDataMap.containsKey("cell")) { -// mBody.put("cell", bizDataMap.get("cell")); -// } -// mBody.put("datas", bizDataMapList); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// if (bizDataMap.containsKey("imei")) { -// contentMap.put("mType", "StartupRequest"); -// mBody.put("iccid", bizDataMap.get("iccid")); -// mBody.put("imei", bizDataMap.get("imei")); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置下发配置回复 -// private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess,String bTypeFail) { -// if (bizDataMap.containsKey("config")) { -// contentMap.put("mType", "SetResponse"); -// contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); -// if ("1".equals(bizDataMap.get("config"))) { -// mBody.put("bType", bTypeSuccess); -// }else { -// mBody.put("bType", bTypeFail); -// } -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置kafka回复 -// private void sendKafkaMsg(Map bizDataMap, Map contentMap, Map mBody) { -// if (bizDataMap.containsKey("devcode")) { -// contentMap.put("devCode", bizDataMap.get("devcode")); -// } -// contentMap.put("mBody", mBody); -// kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); -// } -// -//} +package com.casic.missiles.parser.sender.impl; + +import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.KafkaSendResultHandler; +import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; +import com.casic.missiles.pojo.SubscribeDetailConfig; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2023-11-10 + */ +@Component("kafka") +public class KafkaSubscribe extends SenderSupport implements DataSubscribeProvider { + + private KafkaTemplate kafkaTemplate; + + public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { + this.kafkaTemplate = kafkaTemplate; + //回调方法、异常处理 + this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); + } + + @Override + @Transactional + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + Map contentMap=this.buildTypeDeviceData(bizDataMapList); + kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); + } + +} diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java new file mode 100644 index 0000000..d4918eb --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java @@ -0,0 +1,73 @@ +package com.casic.missiles.parser.sender; + +import cn.hutool.core.date.DateUtil; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2024-03-26 + */ +public class SenderSupport { + + + /** + * 构建不同类型的数据结构 + * + * @param bizDataMapList 构建数据的初始化结果集 + */ + protected Map buildTypeDeviceData(List> bizDataMapList) { + Map contentMap = new HashMap(), mBody = new HashMap(); + Map bizDataMap = bizDataMapList.get(0); + switch ((Integer) bizDataMap.get("deviceType")) { + case 32: + //构建不同类型的下发配置事件 + contentMap.put("devType", "GasDetector"); + setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); + break; + case 31: + contentMap.put("devType", "Pressure"); + setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); + } + //构建值回复的类型 + if (bizDataMap.containsKey("dataValue")) { + contentMap.put("mType", "Data"); + if (bizDataMap.containsKey("cell")) { + mBody.put("cell", bizDataMap.get("cell")); + } + mBody.put("datas", bizDataMapList); + } + //构建三码上数的结构 + if (bizDataMap.containsKey("imei")) { + contentMap.put("mType", "StartupRequest"); + mBody.put("iccid", bizDataMap.get("iccid")); + mBody.put("imei", bizDataMap.get("imei")); + } + if (bizDataMap.containsKey("devcode")) { + contentMap.put("devCode", bizDataMap.get("devcode")); + } + contentMap.put("mBody", mBody); + return contentMap; + } + + /** + * 设置下发配置回复 + */ + private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess, String bTypeFail) { + if (bizDataMap.containsKey("config")) { + contentMap.put("mType", "SetResponse"); + contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); + if ("1".equals(bizDataMap.get("config"))) { + mBody.put("bType", bTypeSuccess); + } else { + mBody.put("bType", bTypeFail); + } + } + } + + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java index 787874a..0b2667a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java @@ -1,8 +1,11 @@ package com.casic.missiles.parser.sender.impl; -import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; import com.casic.missiles.pojo.SubscribeDetailConfig; +import com.casic.missiles.util.SpringContextUtil; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; @@ -10,24 +13,36 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.producer.internals.Sender; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.io.IOException; import java.util.List; import java.util.Map; @Component("functionCallback") -public class FunctionCallback implements DataSubscribeProvider { +public class FunctionCallback extends SenderSupport implements DataSubscribeProvider { + @Override - public void publishDataSubscribe(List> bizDataMap, SubscribeDetailConfig subscribeDetailConfig) { - String url = subscribeDetailConfig.getAddress(); - String json = JSONArray.toJSONString(bizDataMap); - doPublishDataSubscribe(json, url); + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + //如果地址为空,则直接返回结果 + if (StringUtils.isEmpty(sensorhubSenderProperties.getUrl())) { + return; + } + Map contentMap = this.buildTypeDeviceData(bizDataMapList); + doPublishDataSubscribe(JSON.toJSONString(contentMap), sensorhubSenderProperties.getUrl()); } /** - * 通过post方法执行方法回调 + * 通过post方法执行方法回调 * * @param json * @param url diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java index b95d1d8..288ed52 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java @@ -1,92 +1,44 @@ -//package com.casic.missiles.parser.sender.impl; -// -//import cn.hutool.core.date.DateUtil; -//import com.alibaba.fastjson.JSON; -//import com.casic.missiles.autoconfig.KafkaSendResultHandler; -//import com.casic.missiles.parser.sender.DataSubscribeProvider; -//import com.casic.missiles.pojo.SubscribeDetailConfig; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.stereotype.Component; -//import org.springframework.transaction.annotation.Transactional; -//import org.springframework.util.CollectionUtils; -// -//import java.util.Date; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -///** -// * @author cz -// * @date 2023-11-10 -// */ -//@Component("kafka") -//public class KafkaSubscribe implements DataSubscribeProvider { -// -// private KafkaTemplate kafkaTemplate; -// -// public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { -// this.kafkaTemplate = kafkaTemplate; -// //回调方法、异常处理 -// this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); -// } -// -// @Override -// @Transactional -// public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { -// if (CollectionUtils.isEmpty(bizDataMapList)) { -// return; -// } -// Map contentMap = new HashMap(), mBody = new HashMap(); -// Map bizDataMap = bizDataMapList.get(0); -// switch ((Integer) bizDataMap.get("deviceType")) { -// case 32: -// contentMap.put("devType", "GasDetector"); -// setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); -// break; -// case 31: -// contentMap.put("devType", "Pressure"); -// setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); -// } -// -// if (bizDataMap.containsKey("dataValue")) { -// contentMap.put("mType", "Data"); -// if (bizDataMap.containsKey("cell")) { -// mBody.put("cell", bizDataMap.get("cell")); -// } -// mBody.put("datas", bizDataMapList); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// if (bizDataMap.containsKey("imei")) { -// contentMap.put("mType", "StartupRequest"); -// mBody.put("iccid", bizDataMap.get("iccid")); -// mBody.put("imei", bizDataMap.get("imei")); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置下发配置回复 -// private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess,String bTypeFail) { -// if (bizDataMap.containsKey("config")) { -// contentMap.put("mType", "SetResponse"); -// contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); -// if ("1".equals(bizDataMap.get("config"))) { -// mBody.put("bType", bTypeSuccess); -// }else { -// mBody.put("bType", bTypeFail); -// } -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置kafka回复 -// private void sendKafkaMsg(Map bizDataMap, Map contentMap, Map mBody) { -// if (bizDataMap.containsKey("devcode")) { -// contentMap.put("devCode", bizDataMap.get("devcode")); -// } -// contentMap.put("mBody", mBody); -// kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); -// } -// -//} +package com.casic.missiles.parser.sender.impl; + +import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.KafkaSendResultHandler; +import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; +import com.casic.missiles.pojo.SubscribeDetailConfig; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2023-11-10 + */ +@Component("kafka") +public class KafkaSubscribe extends SenderSupport implements DataSubscribeProvider { + + private KafkaTemplate kafkaTemplate; + + public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { + this.kafkaTemplate = kafkaTemplate; + //回调方法、异常处理 + this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); + } + + @Override + @Transactional + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + Map contentMap=this.buildTypeDeviceData(bizDataMapList); + kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java index 833a4b1..260f7d3 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java @@ -3,10 +3,12 @@ import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.cache.ProtocolProcessEventListener; import com.casic.missiles.parser.safe.SafeStrategy; import com.casic.missiles.parser.sender.DataSubscribeProvider; //import com.casic.missiles.parser.sender.impl.KafkaSubscribe; +import com.casic.missiles.parser.sender.impl.KafkaSubscribe; import com.casic.missiles.pojo.*; import com.casic.missiles.registry.DatagramEventRegistry; import com.casic.missiles.registry.SubscribeRegistry; @@ -148,8 +150,9 @@ * 数据订阅 */ public void storeData(List> bizDataMap) { -// DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean(KafkaSubscribe.class); -// subscribeProvider.publishDataSubscribe(bizDataMap, null); + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean( sensorhubSenderProperties.getBean()); + subscribeProvider.publishDataSubscribe(bizDataMap, null); } // DataSubscribeProvider dataSubscribeProvider = SpringContextUtil.getBean(processorInstance.getSubscribeBean()); diff --git a/casic-iot-web/src/main/resources/config/application-dev.yml b/casic-iot-web/src/main/resources/config/application-dev.yml index bea40b6..89786d7 100644 --- a/casic-iot-web/src/main/resources/config/application-dev.yml +++ b/casic-iot-web/src/main/resources/config/application-dev.yml @@ -36,13 +36,13 @@ redis: invalid-time: 86400 config-prefix: 'Casic:' - sysUrl: /sys #kaptcha-open: false #是否开启登录时验证码 (true/false) - no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data + no-login-urls: ${casic.sysUrl}/user/login,${casic.sysUrl}/user/appLogin,${casic.sysUrl}/kaptcha/base64,${casic.sysUrl}/config/baseConfig,/route/mockToken,/v2/api-docs,/v2/api-docs-ext,/doc.html,/cockpit/**,/websocket/**,/webjars/**,/swagger-ui.html,/swagger-resources,/push/data,/push/test #flowable数据源和多数据源配置 db: init: enable: false + logging: level.root: error level.com.casic: debug diff --git a/casic-iot-web/src/main/resources/config/application.yml b/casic-iot-web/src/main/resources/config/application.yml index 8bd96af..6d35105 100644 --- a/casic-iot-web/src/main/resources/config/application.yml +++ b/casic-iot-web/src/main/resources/config/application.yml @@ -70,6 +70,9 @@ sensorhub: config: port: 7091 + subscribe: + bean: "functionCallback" + url: "http://127.0.0.1:7093/push/test" #代码生成器配置 code: diff --git a/sensorhub-core/pom.xml b/sensorhub-core/pom.xml index af9af9b..ada8f37 100644 --- a/sensorhub-core/pom.xml +++ b/sensorhub-core/pom.xml @@ -56,6 +56,12 @@ + commons-httpclient + commons-httpclient + 3.1 + + + com.casic casic-iot-service ${iot.version} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java deleted file mode 100644 index e494726..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerConfig.java +++ /dev/null @@ -1,100 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.config.KafkaListenerContainerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -//import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -//import org.springframework.kafka.listener.ContainerProperties; -// -//import java.util.HashMap; -//import java.util.Map; -// -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaConsumerConfig { -// -// @Value("${spring.kafka.consumer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.consumer.group-id}") -// private String groupId; -// @Value("${spring.kafka.consumer.enable-auto-commit}") -// private boolean enableAutoCommit; -// @Value("${spring.kafka.properties.session.timeout.ms}") -// private String sessionTimeout; -// @Value("${spring.kafka.properties.max.poll.interval.ms}") -// private String maxPollIntervalTime; -// @Value("${spring.kafka.consumer.max-poll-records}") -// private String maxPollRecords; -// @Value("${spring.kafka.consumer.auto-offset-reset}") -// private String autoOffsetReset; -// @Value("${spring.kafka.listener.concurrency}") -// private Integer concurrency; -// @Value("${spring.kafka.listener.missing-topics-fatal}") -// private boolean missingTopicsFatal; -// -// private final long pollTimeout = 600000; -// -// @Bean -// public Map consumerConfigs() { -// Map propsMap = new HashMap<>(16); -// // 服务器地址,不多说配置直接用 -// propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// // groupId不多说,直接用 -// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 -// propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); -// //自动提交的时间间隔,自动提交开启时生效 -// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); -// //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: -// //我们使用latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 -// propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); -// //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance -// propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime); -// //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。 -// propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); -// //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s -// propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); -// //序列化(我们这边使用StringDeserializer,与生产者保持一致) -// propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -// // 下面四个参数是用户名密码的参数,没有用户名密码可以去掉以下配置 -//// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// propsMap.put("java.security.auth.login.config", "10000"); -// // 这里username设置用户名, password设置密码我写死到代码里了,可以更改为nacos配置 -//// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return propsMap; -// } -// -// // 消费者工厂,将配置信息加载进去 -// @Bean("consumerFactory") -// public DefaultKafkaConsumerFactory consumerFactory() { -// return new DefaultKafkaConsumerFactory(consumerConfigs()); -// } -// -// @Bean("listenerContainerFactory") -// public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// //在侦听器容器中运行的线程数,一般设置为 机器数*分区数 -// factory.setConcurrency(concurrency); -// //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 -// factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal); -// //自动提交关闭,需要设置手动消息确认 -// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); -// factory.getContainerProperties().setPollTimeout(pollTimeout); -// return factory; -// } -//} -// diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java deleted file mode 100644 index 6600312..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListener.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.web.bind.annotation.RestController; -// -//import javax.annotation.Resource; -// -//@RestController() -//public class KafkaConsumerListener{ -// @Resource -// private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; -// /** -// * 监听kafka消息 -// * -// * 使用autoStartup = "false"必须指定id -// */ -// @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"KAFKA_TEST_TOPICS"}, autoStartup = "true") -// public void listenTopics(ConsumerRecord consumerRecord, Acknowledgment ack) { -// try { -// System.out.println("listenTopics接受消息:" + consumerRecord.value()); -// //手动确认 -// ack.acknowledge(); -// } catch (Exception e) { -// System.out.println("消费失败:" + e); -// } -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java deleted file mode 100644 index dbc561c..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaConsumerListenerError.java +++ /dev/null @@ -1,29 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import edu.umd.cs.findbugs.annotations.NonNull; -//import org.apache.kafka.clients.consumer.Consumer; -//import org.springframework.kafka.listener.KafkaListenerErrorHandler; -//import org.springframework.kafka.listener.ListenerExecutionFailedException; -//import org.springframework.messaging.Message; -//import org.springframework.stereotype.Component; -// -//@Component -//public class KafkaConsumerListenerError implements KafkaListenerErrorHandler { -// -// -// @Override -// @NonNull -// public Object handleError(Message message, ListenerExecutionFailedException e) { -// return new Object(); -// } -// -// @Override -// public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) { -// System.out.println("消息详情:" + message); -// System.out.println("异常信息::" + exception); -// System.out.println("消费者详情::" + consumer.groupMetadata()); -// System.out.println("监听主题::" + consumer.listTopics()); -// return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java deleted file mode 100644 index 1d605ed..0000000 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaProviderConfig.java +++ /dev/null @@ -1,85 +0,0 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.producer.ProducerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringSerializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.boot.SpringBootConfiguration; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Primary; -//import org.springframework.kafka.core.DefaultKafkaProducerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.kafka.core.ProducerFactory; -//import org.springframework.kafka.transaction.KafkaTransactionManager; -// -//import java.util.HashMap; -//import java.util.Map; -// -///** -// * @author cz -// */ -//@SpringBootConfiguration -//public class KafkaProviderConfig { -// -// @Value("${spring.kafka.producer.bootstrap-servers}") -// private String bootstrapServers; -// @Value("${spring.kafka.producer.acks}") -// private String acks; -// @Value("${spring.kafka.producer.retries}") -// private String retries; -// @Value("${spring.kafka.producer.batch-size}") -// private String batchSize; -// @Value("${spring.kafka.producer.buffer-memory}") -// private String bufferMemory; -// -// @Bean -// public Map producerConfigs() { -// Map props = new HashMap<>(16); -// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// //响应模式,我们使用acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 -// props.put(ProducerConfig.ACKS_CONFIG, acks); -// //发生错误后,消息重发的次数,开启事务必须大于0 -// props.put(ProducerConfig.RETRIES_CONFIG, retries); -// //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送 -// props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); -// //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间, -// props.put(ProducerConfig.LINGER_MS_CONFIG, "5000"); -// //生产者内存缓冲区的大小 -// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); -// //序列和消费者对应 -// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); -// -// //用户名密码配置,没有用户名密码可以去掉以下配置 -//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); -//// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); -//// props.put("java.security.auth.login.config", "10000"); -// // 可以在nacos配置文件中配置 -//// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";")); -// return props; -// } -// -// // 生产者工厂 -// @Bean("kafkaProduceFactory") -// public ProducerFactory producerFactory() { -// DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// factory.setTransactionIdPrefix("kafkaTx-"); -// return factory; -// } -// -// // 事务处理 -// // 这里的事务处理会和项目中的其他事务起冲突,所以我一般会把@Bean去掉,不用spring代理 -// @Bean("kafkaTransactionManager") -// @Primary -// public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { -// return new KafkaTransactionManager(producerFactory); -// } -// -// @Bean -// public KafkaTemplate kafkaTemplate() { -// return new KafkaTemplate<>(producerFactory()); -// } -// -//} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java index 1598ee0..1fdac64 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/KafkaSendResultHandler.java @@ -1,22 +1,22 @@ -//package com.casic.missiles.autoconfig; -// -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.apache.kafka.clients.producer.RecordMetadata; -//import org.springframework.kafka.support.ProducerListener; -//import org.springframework.stereotype.Component; -// -//import javax.annotation.Nullable; -// -//@Component -//public class KafkaSendResultHandler implements ProducerListener { -// -// @Override -// public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { -// System.out.println("消息发送成功:" + producerRecord.toString()); -// } -// -// @Override -// public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { -// System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); -// } -//} +package com.casic.missiles.autoconfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.ProducerListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Nullable; + +@Component +public class KafkaSendResultHandler implements ProducerListener { + + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + System.out.println("消息发送成功:" + producerRecord.toString()); + } + + @Override + public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) { + System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage()); + } +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java index 1b1a261..3c58fe0 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubProperties.java @@ -18,4 +18,7 @@ + + + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java new file mode 100644 index 0000000..da7916e --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/autoconfig/SensorhubSenderProperties.java @@ -0,0 +1,19 @@ +package com.casic.missiles.autoconfig; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "sensorhub.subscribe") +public class SensorhubSenderProperties { + + /** + * 订阅bean + */ + private String bean = "kafka"; + + private String url; + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java index dfed36a..26b0b61 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/controller/ReceiveController.java @@ -31,4 +31,12 @@ return responseData; } + @RequestMapping("/test") + public Object testNbCallBack(@RequestBody Map dataMap) { + ResponseData responseData = new ResponseData(); + System.out.println("----------" + JSON.toJSON(dataMap)); + responseData.setCode(200); + return responseData; + } + } diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java index ec9a04f..36cbe02 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/GenericProtocolParser.java @@ -54,7 +54,6 @@ if (ObjectUtil.isEmpty(protocolConfig)) { return null; } - ParseResult result = null; String devcode = null; //暂时先取第一个, 减少类的创建销毁与构建 @@ -122,7 +121,7 @@ } ProtocolProcessEventListener.setTask(devcode, bizDataMap, 3); //存储数据 -// datagramEventProvider.storeData(bizDataMap); + datagramEventProvider.storeData(bizDataMap); } catch (RuntimeException rex) { log.error("解析出现异常,异常信息为{}", rex); //数据发送,异步,异常拦截 diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java new file mode 100644 index 0000000..d4918eb --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/SenderSupport.java @@ -0,0 +1,73 @@ +package com.casic.missiles.parser.sender; + +import cn.hutool.core.date.DateUtil; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2024-03-26 + */ +public class SenderSupport { + + + /** + * 构建不同类型的数据结构 + * + * @param bizDataMapList 构建数据的初始化结果集 + */ + protected Map buildTypeDeviceData(List> bizDataMapList) { + Map contentMap = new HashMap(), mBody = new HashMap(); + Map bizDataMap = bizDataMapList.get(0); + switch ((Integer) bizDataMap.get("deviceType")) { + case 32: + //构建不同类型的下发配置事件 + contentMap.put("devType", "GasDetector"); + setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); + break; + case 31: + contentMap.put("devType", "Pressure"); + setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); + } + //构建值回复的类型 + if (bizDataMap.containsKey("dataValue")) { + contentMap.put("mType", "Data"); + if (bizDataMap.containsKey("cell")) { + mBody.put("cell", bizDataMap.get("cell")); + } + mBody.put("datas", bizDataMapList); + } + //构建三码上数的结构 + if (bizDataMap.containsKey("imei")) { + contentMap.put("mType", "StartupRequest"); + mBody.put("iccid", bizDataMap.get("iccid")); + mBody.put("imei", bizDataMap.get("imei")); + } + if (bizDataMap.containsKey("devcode")) { + contentMap.put("devCode", bizDataMap.get("devcode")); + } + contentMap.put("mBody", mBody); + return contentMap; + } + + /** + * 设置下发配置回复 + */ + private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess, String bTypeFail) { + if (bizDataMap.containsKey("config")) { + contentMap.put("mType", "SetResponse"); + contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); + if ("1".equals(bizDataMap.get("config"))) { + mBody.put("bType", bTypeSuccess); + } else { + mBody.put("bType", bTypeFail); + } + } + } + + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java index 787874a..0b2667a 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/FunctionCallback.java @@ -1,8 +1,11 @@ package com.casic.missiles.parser.sender.impl; -import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; import com.casic.missiles.pojo.SubscribeDetailConfig; +import com.casic.missiles.util.SpringContextUtil; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; @@ -10,24 +13,36 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.producer.internals.Sender; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import java.io.IOException; import java.util.List; import java.util.Map; @Component("functionCallback") -public class FunctionCallback implements DataSubscribeProvider { +public class FunctionCallback extends SenderSupport implements DataSubscribeProvider { + @Override - public void publishDataSubscribe(List> bizDataMap, SubscribeDetailConfig subscribeDetailConfig) { - String url = subscribeDetailConfig.getAddress(); - String json = JSONArray.toJSONString(bizDataMap); - doPublishDataSubscribe(json, url); + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + //如果地址为空,则直接返回结果 + if (StringUtils.isEmpty(sensorhubSenderProperties.getUrl())) { + return; + } + Map contentMap = this.buildTypeDeviceData(bizDataMapList); + doPublishDataSubscribe(JSON.toJSONString(contentMap), sensorhubSenderProperties.getUrl()); } /** - * 通过post方法执行方法回调 + * 通过post方法执行方法回调 * * @param json * @param url diff --git a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java index b95d1d8..288ed52 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/parser/sender/impl/KafkaSubscribe.java @@ -1,92 +1,44 @@ -//package com.casic.missiles.parser.sender.impl; -// -//import cn.hutool.core.date.DateUtil; -//import com.alibaba.fastjson.JSON; -//import com.casic.missiles.autoconfig.KafkaSendResultHandler; -//import com.casic.missiles.parser.sender.DataSubscribeProvider; -//import com.casic.missiles.pojo.SubscribeDetailConfig; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.stereotype.Component; -//import org.springframework.transaction.annotation.Transactional; -//import org.springframework.util.CollectionUtils; -// -//import java.util.Date; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -///** -// * @author cz -// * @date 2023-11-10 -// */ -//@Component("kafka") -//public class KafkaSubscribe implements DataSubscribeProvider { -// -// private KafkaTemplate kafkaTemplate; -// -// public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { -// this.kafkaTemplate = kafkaTemplate; -// //回调方法、异常处理 -// this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); -// } -// -// @Override -// @Transactional -// public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { -// if (CollectionUtils.isEmpty(bizDataMapList)) { -// return; -// } -// Map contentMap = new HashMap(), mBody = new HashMap(); -// Map bizDataMap = bizDataMapList.get(0); -// switch ((Integer) bizDataMap.get("deviceType")) { -// case 32: -// contentMap.put("devType", "GasDetector"); -// setEventType(bizDataMap, contentMap, mBody, "GasConfigSuccess", "GasConfigFail"); -// break; -// case 31: -// contentMap.put("devType", "Pressure"); -// setEventType(bizDataMap, contentMap, mBody, "PressureConfigSuccess", "PressureConfigFail"); -// } -// -// if (bizDataMap.containsKey("dataValue")) { -// contentMap.put("mType", "Data"); -// if (bizDataMap.containsKey("cell")) { -// mBody.put("cell", bizDataMap.get("cell")); -// } -// mBody.put("datas", bizDataMapList); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// if (bizDataMap.containsKey("imei")) { -// contentMap.put("mType", "StartupRequest"); -// mBody.put("iccid", bizDataMap.get("iccid")); -// mBody.put("imei", bizDataMap.get("imei")); -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置下发配置回复 -// private void setEventType(Map bizDataMap, Map contentMap, Map mBody, String bTypeSuccess,String bTypeFail) { -// if (bizDataMap.containsKey("config")) { -// contentMap.put("mType", "SetResponse"); -// contentMap.put("ts", DateUtil.format(new Date(), "yyyyMMddHHmmss")); -// if ("1".equals(bizDataMap.get("config"))) { -// mBody.put("bType", bTypeSuccess); -// }else { -// mBody.put("bType", bTypeFail); -// } -// sendKafkaMsg(bizDataMap, contentMap, mBody); -// } -// -// } -// -// //设置kafka回复 -// private void sendKafkaMsg(Map bizDataMap, Map contentMap, Map mBody) { -// if (bizDataMap.containsKey("devcode")) { -// contentMap.put("devCode", bizDataMap.get("devcode")); -// } -// contentMap.put("mBody", mBody); -// kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); -// } -// -//} +package com.casic.missiles.parser.sender.impl; + +import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSON; +import com.casic.missiles.autoconfig.KafkaSendResultHandler; +import com.casic.missiles.parser.sender.DataSubscribeProvider; +import com.casic.missiles.parser.sender.SenderSupport; +import com.casic.missiles.pojo.SubscribeDetailConfig; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author cz + * @date 2023-11-10 + */ +@Component("kafka") +public class KafkaSubscribe extends SenderSupport implements DataSubscribeProvider { + + private KafkaTemplate kafkaTemplate; + + public KafkaSubscribe(KafkaTemplate kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) { + this.kafkaTemplate = kafkaTemplate; + //回调方法、异常处理 + this.kafkaTemplate.setProducerListener(kafkaSendResultHandler); + } + + @Override + @Transactional + public void publishDataSubscribe(List> bizDataMapList, SubscribeDetailConfig subscribeDetailConfig) { + if (CollectionUtils.isEmpty(bizDataMapList)) { + return; + } + Map contentMap=this.buildTypeDeviceData(bizDataMapList); + kafkaTemplate.send("pressure", JSON.toJSONString(contentMap)); + } + +} diff --git a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java index 833a4b1..260f7d3 100644 --- a/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java +++ b/sensorhub-core/src/main/java/com/casic/missiles/provider/ProcessorInstanceProvider.java @@ -3,10 +3,12 @@ import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; +import com.casic.missiles.autoconfig.SensorhubSenderProperties; import com.casic.missiles.cache.ProtocolProcessEventListener; import com.casic.missiles.parser.safe.SafeStrategy; import com.casic.missiles.parser.sender.DataSubscribeProvider; //import com.casic.missiles.parser.sender.impl.KafkaSubscribe; +import com.casic.missiles.parser.sender.impl.KafkaSubscribe; import com.casic.missiles.pojo.*; import com.casic.missiles.registry.DatagramEventRegistry; import com.casic.missiles.registry.SubscribeRegistry; @@ -148,8 +150,9 @@ * 数据订阅 */ public void storeData(List> bizDataMap) { -// DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean(KafkaSubscribe.class); -// subscribeProvider.publishDataSubscribe(bizDataMap, null); + SensorhubSenderProperties sensorhubSenderProperties = SpringContextUtil.getBean(SensorhubSenderProperties.class); + DataSubscribeProvider subscribeProvider = SpringContextUtil.getBean( sensorhubSenderProperties.getBean()); + subscribeProvider.publishDataSubscribe(bizDataMap, null); } // DataSubscribeProvider dataSubscribeProvider = SpringContextUtil.getBean(processorInstance.getSubscribeBean()); diff --git a/sensorhub-core/src/main/java/com/casic/missiles/util/HttpClientUtils.java b/sensorhub-core/src/main/java/com/casic/missiles/util/HttpClientUtils.java new file mode 100644 index 0000000..f5ed6a4 --- /dev/null +++ b/sensorhub-core/src/main/java/com/casic/missiles/util/HttpClientUtils.java @@ -0,0 +1,38 @@ +package com.casic.missiles.util; + + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.methods.PostMethod; + +import java.io.IOException; + +public class HttpClientUtils { + + public static String post(String url, String body, JSONObject header) { + HttpClient client = new HttpClient(); + client.setTimeout(30000); + PostMethod method = new PostMethod(url); + // for (Map.Entry entry : headers.entrySet()) { + method.addRequestHeader("Content-type", "application/json; charset=utf-8"); + method.addRequestHeader("Accept", "application/json"); + method.addRequestHeader("X-APP-KEY", header.getString("X-APP-KEY")); + method.setRequestBody(body); + try { + int statusCode = client.executeMethod(method); + if (statusCode == HttpStatus.SC_OK) { + return new String(method.getResponseBody(), "UTF-8"); + } + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + +}