linfutong преди 3 години
родител
ревизия
3a36780048

+ 0 - 3
zd-modules/zd-algorithm/src/main/java/com/zd/alg/forward/serivce/mqtt/CommonSend.java

@@ -40,7 +40,4 @@ public class CommonSend {
     {
         mqttProducer.sendToMqtt(topic, QOS, JSON.toJSONString(messageBody));
     }
-
-
-
 }

+ 36 - 40
zd-modules/zd-algorithm/src/main/java/com/zd/alg/mqtt/MqttConfig.java

@@ -25,7 +25,12 @@ import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
 import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.*;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * mqtt配置
@@ -47,7 +52,7 @@ public class MqttConfig {
 
     private static final byte[] WILL_DATA;
 
-    private static String WILL_TOPIC = "willTopic";
+    private static String WILL_TOPIC = "alert";
 
     static {
         WILL_DATA = "offline".getBytes();
@@ -79,9 +84,6 @@ public class MqttConfig {
     @Value("${mqtt.consumer.maxInflight}")
     private Integer maxInflight;
 
-    private MqttPahoMessageDrivenChannelAdapter adapter;
-
-
     /**
      * 连接mqtt配置
      *
@@ -150,7 +152,7 @@ public class MqttConfig {
     @Bean
     public MessageProducer inbound() {
         // 可同时消费(订阅)多个Topic
-        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(2);
@@ -180,44 +182,38 @@ public class MqttConfig {
     @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
     public MessageHandler mqttInbound() {
 
-        return new MessageHandler() {
-            @Override
-            public void handleMessage(Message<?> message) throws MessagingException {
-                MessageHeaders messageHeaders = message.getHeaders();
-                //String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
-                String receivedTopic = (String) messageHeaders.get(MqttHeaders.TOPIC);
-//                logger.info("[通道] - [{}]",receivedTopic);
-//                logger.info("[消息] - [{}]",message.getPayload());
-                String messageStr = message.getPayload().toString();
-//                logger.info("======= receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
-                if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
-                    logger.info("灭火设备消息消费,receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
-                    receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
-                    //根据状态指令获取主机状态
-                    JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
-                    Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
-                    //灭火主机发生预警
-                    if (fireNumber > 0) {
-                        logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
-                        mqttSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN,jsonObject.toJSONString());
-                        //自动灭火
-                        if (fireNumber == 2) {
-                            //redis中是否已存在日志已添加标识
-                            String isExist = redisService.getCacheObject(FireDeviceStatusTask.TOPIC_OUTFIRE_ISEXIST);
-                            if(StringUtils.isEmpty(isExist)){
-                                //自动灭火日志添加
-                                String str = deviceLogService.insertFireAutomatic(receivedTopic);
-                                //redis存值,灭火日志已经添加标识
-                                redisService.setCacheObject(FireDeviceStatusTask.TOPIC_OUTFIRE_ISEXIST, str, 60L, TimeUnit.MINUTES);
-                                logger.info("自动灭火日志添加执行结果," + str);
-                            }
+        return message -> {
+            logger.info("=========>>> [消息头] - [{}]", message.getHeaders()+",[消息内容]: " + message.getPayload());
+            MessageHeaders messageHeaders = message.getHeaders();
+            String receivedTopic = (String) messageHeaders.get(MqttHeaders.TOPIC);
+            String messageStr = message.getPayload().toString();
+            if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
+                logger.info("灭火设备消息消费,receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
+                receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
+                //根据状态指令获取主机状态
+                JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
+                Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
+                //灭火主机发生预警
+                if (fireNumber > 0) {
+                    logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
+                    mqttSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN,jsonObject.toJSONString());
+                    //自动灭火
+                    if (fireNumber == 2) {
+                        //redis中是否已存在日志已添加标识
+                        String isExist = redisService.getCacheObject(FireDeviceStatusTask.TOPIC_OUTFIRE_ISEXIST);
+                        if(StringUtils.isEmpty(isExist)){
+                            //自动灭火日志添加
+                            String str = deviceLogService.insertFireAutomatic(receivedTopic);
+                            //redis存值,灭火日志已经添加标识
+                            redisService.setCacheObject(FireDeviceStatusTask.TOPIC_OUTFIRE_ISEXIST, str, 60L, TimeUnit.MINUTES);
+                            logger.info("自动灭火日志添加执行结果," + str);
                         }
                     }
-                    //redis存值 采集器编号:主机状态json
-                    redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
-                    //redis存值 灭火主机响应时间
-                    redisService.setCacheObject(FireDeviceStatusTask.TOPIC_FD_RESPOND_TIME, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
                 }
+                //redis存值 采集器编号:主机状态json
+                redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
+                //redis存值 灭火主机响应时间
+                redisService.setCacheObject(FireDeviceStatusTask.TOPIC_FD_RESPOND_TIME, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
             }
         };
     }