diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java new file mode 100644 index 0000000..1615fa0 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java @@ -0,0 +1,45 @@ +package com.casic.missiles.mqtt.sender; + +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.mqtt.MqttGateway; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class MqttMessageSender { + + private MqttGateway mqttGateway; + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody){ + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message){ + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java new file mode 100644 index 0000000..1615fa0 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java @@ -0,0 +1,45 @@ +package com.casic.missiles.mqtt.sender; + +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.mqtt.MqttGateway; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class MqttMessageSender { + + private MqttGateway mqttGateway; + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody){ + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message){ + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java new file mode 100644 index 0000000..c8b7a38 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java @@ -0,0 +1,41 @@ +package com.casic.missiles.mqtt.sender; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +public class MqttOutboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + mqttConfig.getClientId() + "-" + System.currentTimeMillis() + System.currentTimeMillis(), factory); + + messageHandler.setDefaultQos(0); + //开启异步 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(mqttConfig.getTopic()); + return messageHandler; + } + +} diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java new file mode 100644 index 0000000..1615fa0 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java @@ -0,0 +1,45 @@ +package com.casic.missiles.mqtt.sender; + +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.mqtt.MqttGateway; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class MqttMessageSender { + + private MqttGateway mqttGateway; + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody){ + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message){ + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java new file mode 100644 index 0000000..c8b7a38 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java @@ -0,0 +1,41 @@ +package com.casic.missiles.mqtt.sender; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +public class MqttOutboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + mqttConfig.getClientId() + "-" + System.currentTimeMillis() + System.currentTimeMillis(), factory); + + messageHandler.setDefaultQos(0); + //开启异步 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(mqttConfig.getTopic()); + return messageHandler; + } + +} diff --git a/casic-web/pom.xml b/casic-web/pom.xml index 466a6f8..ca61dc4 100644 --- a/casic-web/pom.xml +++ b/casic-web/pom.xml @@ -56,6 +56,8 @@ org.springframework.boot spring-boot-starter-jdbc + + com.casic diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java new file mode 100644 index 0000000..1615fa0 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java @@ -0,0 +1,45 @@ +package com.casic.missiles.mqtt.sender; + +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.mqtt.MqttGateway; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class MqttMessageSender { + + private MqttGateway mqttGateway; + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody){ + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message){ + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java new file mode 100644 index 0000000..c8b7a38 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java @@ -0,0 +1,41 @@ +package com.casic.missiles.mqtt.sender; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +public class MqttOutboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + mqttConfig.getClientId() + "-" + System.currentTimeMillis() + System.currentTimeMillis(), factory); + + messageHandler.setDefaultQos(0); + //开启异步 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(mqttConfig.getTopic()); + return messageHandler; + } + +} diff --git a/casic-web/pom.xml b/casic-web/pom.xml index 466a6f8..ca61dc4 100644 --- a/casic-web/pom.xml +++ b/casic-web/pom.xml @@ -56,6 +56,8 @@ org.springframework.boot spring-boot-starter-jdbc + + com.casic diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index 8c18554..c6c9ddf 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -4,7 +4,7 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://111.198.10.15:11336/casic_template2.0?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true + url: jdbc:mysql://111.198.10.15:11336/casic_robot_inspection?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true username: root password: Casic203 jms: diff --git a/casic-server/pom.xml b/casic-server/pom.xml index d0c19cd..1c8f8f0 100644 --- a/casic-server/pom.xml +++ b/casic-server/pom.xml @@ -35,6 +35,17 @@ ${boot.version} provided + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + 5.4.3 + diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..90e8a83 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttConfiguration.java @@ -0,0 +1,50 @@ +package com.casic.missiles.mqtt; + +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Data +@Configuration +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfiguration { + + private String username; + private String password; + private String url; + private String clientId; + private String topic = "TOPIC_DEFAULT"; + private Integer completionTimeout = 2000; + + /** + * 注册MQTT客户端工厂 + * + * @return 客户端工厂 + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: + // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 + // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 + options.setCleanSession(true); + //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 + // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 + options.setConnectionTimeout(0); + //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 + options.setKeepAliveInterval(90); + //自动重新连接 + options.setAutomaticReconnect(true); + options.setUserName(this.getUsername()); + options.setPassword(this.getPassword().toCharArray()); + options.setServerURIs(new String[]{this.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java new file mode 100644 index 0000000..150d5e8 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/MqttGateway.java @@ -0,0 +1,43 @@ +package com.casic.missiles.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Component +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param payload 内容 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 对消息处理的几种机制。 + * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
+ * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
+ * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 + * @param payload 消息体 + * @return void + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java new file mode 100644 index 0000000..0fe2ded --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttInboundConfiguration.java @@ -0,0 +1,70 @@ +package com.casic.missiles.mqtt.receiver; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + private MqttMessageReceiver mqttMessageReceiver; + + /** + * 此处可以使用其他消息通道 + * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 + * + * @return + */ + @Bean + public MessageChannel mqttInBoundChannel() { + return new DirectChannel(); + } + + /** + * 适配器, 两个topic共用一个adapter + * 客户端作为消费者,订阅主题,消费消息 + * + * @param + * @param + * @return + */ + @Bean + public MessageProducerSupport mqttInbound() { + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); + + adapter.setCompletionTimeout(60000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setRecoveryInterval(10000); + adapter.setQos(0); + adapter.setOutputChannel(mqttInBoundChannel()); + return adapter; + } + + /** + * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 + * + * @return + */ + @Bean + @ServiceActivator(inputChannel = "mqttInBoundChannel") + public MessageHandler mqttMessageHandler() { + return this.mqttMessageReceiver; + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java new file mode 100644 index 0000000..282b99d --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/receiver/MqttMessageReceiver.java @@ -0,0 +1,32 @@ +package com.casic.missiles.mqtt.receiver; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Slf4j +@AllArgsConstructor +@Component +public class MqttMessageReceiver implements MessageHandler { + + @Override + public void handleMessage(Message message) throws MessagingException { + try { + MessageHeaders headers = message.getHeaders(); + //获取消息Topic + String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); + log.info("[获取到的消息的topic :]{} ", receivedTopic); + //获取消息体 + String payload = (String) message.getPayload(); + log.info("[获取到的消息的payload :]{} ", payload); + //todo .... + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java new file mode 100644 index 0000000..1615fa0 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttMessageSender.java @@ -0,0 +1,45 @@ +package com.casic.missiles.mqtt.sender; + +import com.alibaba.fastjson.JSONObject; +import com.casic.missiles.mqtt.MqttGateway; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class MqttMessageSender { + + private MqttGateway mqttGateway; + + /** + * 发送mqtt消息 + * @param topic 主题 + * @param message 内容 + * @return void + */ + public void send(String topic, String message) { + mqttGateway.sendToMqtt(topic, message); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param messageBody 消息体 + * @return void + */ + public void send(String topic, int qos, JSONObject messageBody){ + mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); + } + + /** + * 发送包含qos的消息 + * @param topic 主题 + * @param qos 质量 + * @param message 消息体 + * @return void + */ + public void send(String topic, int qos, byte[] message){ + mqttGateway.sendToMqtt(topic, qos, message); + } +} diff --git a/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java new file mode 100644 index 0000000..c8b7a38 --- /dev/null +++ b/casic-server/src/main/java/com/casic/missiles/mqtt/sender/MqttOutboundConfiguration.java @@ -0,0 +1,41 @@ +package com.casic.missiles.mqtt.sender; + +import com.casic.missiles.mqtt.MqttConfiguration; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Slf4j +@AllArgsConstructor +@Configuration +public class MqttOutboundConfiguration { + + private MqttConfiguration mqttConfig; + private MqttPahoClientFactory factory; + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + mqttConfig.getClientId() + "-" + System.currentTimeMillis() + System.currentTimeMillis(), factory); + + messageHandler.setDefaultQos(0); + //开启异步 + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(mqttConfig.getTopic()); + return messageHandler; + } + +} diff --git a/casic-web/pom.xml b/casic-web/pom.xml index 466a6f8..ca61dc4 100644 --- a/casic-web/pom.xml +++ b/casic-web/pom.xml @@ -56,6 +56,8 @@ org.springframework.boot spring-boot-starter-jdbc + + com.casic diff --git a/casic-web/src/main/resources/config/application-dev.yml b/casic-web/src/main/resources/config/application-dev.yml index 8c18554..c6c9ddf 100644 --- a/casic-web/src/main/resources/config/application-dev.yml +++ b/casic-web/src/main/resources/config/application-dev.yml @@ -4,7 +4,7 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://111.198.10.15:11336/casic_template2.0?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true + url: jdbc:mysql://111.198.10.15:11336/casic_robot_inspection?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&&allowMultiQueries=true username: root password: Casic203 jms: diff --git a/casic-web/src/main/resources/config/application.yml b/casic-web/src/main/resources/config/application.yml index 200ca46..33cce3f 100644 --- a/casic-web/src/main/resources/config/application.yml +++ b/casic-web/src/main/resources/config/application.yml @@ -3,7 +3,7 @@ ########################################################## spring: profiles: - active: @activatedProperties@ + active: dev servlet: multipart: max-file-size: 50MB