Kaynağa Gözat

mqtt消息接收逻辑修改

xuxiaofei 3 yıl önce
ebeveyn
işleme
2ea3c587d8

+ 78 - 64
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/mqtt/config/MqttConfig.java

@@ -163,6 +163,7 @@ public class MqttConfig {
 
     /**
      * 通道2
+     *
      * @return
      */
     @Bean(name = MQTT_OUTBOUND_CHANNEL)
@@ -172,6 +173,7 @@ public class MqttConfig {
 
     /**
      * 配置client2,监听的willTopic
+     *
      * @return
      */
     @Bean
@@ -238,80 +240,30 @@ public class MqttConfig {
 
     /**
      * 通过通道2获取数据
+     *
      * @return
      */
     @Bean
     @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
-    public  MessageHandler handlerByte() {
-        logger.info("=======进入消息处理器2=====");
+    public MessageHandler handlerByte() {
         return message -> {
             String receivedTopic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
-
-            try{
-                if(!(message.getPayload() instanceof byte[])){
+            try {
+                if (!(message.getPayload() instanceof byte[])) {
                     return;
                 }
-                logger.info("topic:"+receivedTopic);
-                if(receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)){
-
-                    logger.info("消息处理器2:message:"+message.getPayload().toString());
-                    //String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
-                    String messageStr = bytes2HexString((byte[])message.getPayload());
-                    logger.info("灭火消息,主机回复指令:"+messageStr);
-                    //返回非指令时不向下执行
-                    if(messageStr!=null && messageStr.length()<30){
-                        logger.info("灭火消息指令不正确");
-                        return;
-                    }
-
-                    //获取采集器编号
-                    receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
-                    //根据状态指令获取主机状态
-                    JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
-                    logger.info("灭火主机状态:"+jsonObject+"采集器编号"+receivedTopic+",响应时间key:"+CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic);
-                    //redis存值 采集器编号:主机状态json
-                    redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
-                    //redis存值 灭火主机响应时间
-                    redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
-                    logger.info("灭火消息,redis存值完毕========");
-
-                    Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
-                    //灭火主机发生预警
-                    if (fireNumber > 0) {
-                        logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
-                        //发现灭火设备异常,发送消息通知小程序和PC
-                        commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
-                        if (fireNumber == 2) {
-                            logger.info("=====>>>>>>发现自动灭火");
-                            //通知预案
-                            //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
-                            OneClickFireDTO dto = new OneClickFireDTO();
-                            dto.setDeviceCode(receivedTopic);
-                            dto.setIfFire(1);
-                            dto.setAutomanual(1);
-                            remoteLaboratoryService.oneClickFire(dto);
-
-                            String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
-                            //redis中是否已存在日志已添加标识
-                            if(StringUtils.isEmpty(isExist)){
-                                //自动灭火日志添加
-                                R r = remoteFireDeviceService.addFireLog(receivedTopic);
-                                //redis存值,灭火日志已经添加标识
-                                redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
-                                logger.info("灭火日志添加执行结果," + r.getData());
-                            }
-                        }
-                    }
-
+                if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
+                    logger.info("消息处理器2开始处理=====");
+                    logger.info("topic:" + receivedTopic);
+                    ReturnMessageProcessing(message,receivedTopic);
                 }
-            }catch (Exception e){
+            } catch (Exception e) {
                 e.printStackTrace();
             }
         };
     }
 
 
-
     /**
      * 消费者消息处理
      * 设备心跳
@@ -327,8 +279,8 @@ public class MqttConfig {
             public void handleMessage(Message<?> message) throws MessagingException {
                 MessageHeaders messageHeaders = message.getHeaders();
                 String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
-                logger.info("[通道] - [{}]",receivedTopic);
-                logger.info("[消息] - [{}]",message.getPayload());
+                logger.info("[通道] - [{}]", receivedTopic);
+                logger.info("[消息] - [{}]", message.getPayload());
                 String messageStr = message.getPayload().toString();
                 if (receivedTopic.startsWith(devicePrefix)) {
                     if (receivedTopic.contains("788D4C6C6187ABC")) {
@@ -379,7 +331,12 @@ public class MqttConfig {
                             String relayCode = prefix[prefix.length - 1];
                             TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
                         }
-                }}
+                    }
+                }else if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
+                    logger.info("消息处理器1开始处理=====");
+                    logger.info("topic:" + receivedTopic);
+                    ReturnMessageProcessing(message,receivedTopic);
+                }
                 // TODO
 //                MessageBody messageBody = JSONObject.parseObject(messageStr, MessageBody.class);
 //                mqttResHandler.deal(JSONUtil.toBean(msg,com.ffy.mqtt.model.Message.class));
@@ -391,8 +348,65 @@ public class MqttConfig {
         };
     }
 
+    /***
+     * 灭火主机返回消息处理
+     * @param message
+     * @param receivedTopic
+     */
+    public void ReturnMessageProcessing(Message<?> message, String receivedTopic) {
+        logger.info("灭火消息处理器:" + message.getPayload().toString());
+        //String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
+        String messageStr = bytes2HexString((byte[]) message.getPayload());
+        logger.info("灭火消息,主机回复指令:" + messageStr);
+        //返回非指令时不向下执行
+        if (messageStr != null && messageStr.length() < 30) {
+            logger.info("灭火消息指令不正确");
+            return;
+        }
+
+        //获取采集器编号
+        receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
+        //根据状态指令获取主机状态
+        JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
+        logger.info("灭火主机状态:" + jsonObject + "采集器编号" + receivedTopic + ",响应时间key:" + CacheConstants.FIRE_DEVICE_RESPOND_TIME + receivedTopic);
+        //redis存值 采集器编号:主机状态json
+        redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
+        //redis存值 灭火主机响应时间
+        redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME + receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
+        logger.info("灭火消息,redis存值完毕========");
+
+        Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
+        //灭火主机发生预警
+        if (fireNumber > 0) {
+            logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
+            //发现灭火设备异常,发送消息通知小程序和PC
+            commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN + receivedTopic, jsonObject.toJSONString());
+            if (fireNumber == 2) {
+                logger.info("=====>>>>>>发现自动灭火");
+                //通知预案
+                //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
+                OneClickFireDTO dto = new OneClickFireDTO();
+                dto.setDeviceCode(receivedTopic);
+                dto.setIfFire(1);
+                dto.setAutomanual(1);
+                remoteLaboratoryService.oneClickFire(dto);
+
+                String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
+                //redis中是否已存在日志已添加标识
+                if (StringUtils.isEmpty(isExist)) {
+                    //自动灭火日志添加
+                    R r = remoteFireDeviceService.addFireLog(receivedTopic);
+                    //redis存值,灭火日志已经添加标识
+                    redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
+                    logger.info("灭火日志添加执行结果," + r.getData());
+                }
+            }
+        }
+    }
+
     /**
      * byte 数组转string
+     *
      * @param b
      * @return
      */
@@ -404,14 +418,14 @@ public class MqttConfig {
             if (hex.length() == 1) {
                 hex = '0' + hex;
             }
-            r += hex.toUpperCase()+" ";
+            r += hex.toUpperCase() + " ";
         }
 
         return r;
     }
 
     public static String byte2Hex(byte[] bytes) {
-        logger.info("byte长度:"+bytes.length);
+        logger.info("byte长度:" + bytes.length);
         StringBuilder builder = new StringBuilder();
         String temp;
         for (byte aByte : bytes) {