|
@@ -2,12 +2,18 @@ package com.zd.laboratory.mqtt.config;
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.zd.algorithm.api.fire.RemoteFireDeviceService;
|
|
|
|
|
+import com.zd.common.core.redis.RedisService;
|
|
|
import com.zd.common.core.utils.SpringUtils;
|
|
import com.zd.common.core.utils.SpringUtils;
|
|
|
import com.zd.laboratory.event.LabMessageEvent;
|
|
import com.zd.laboratory.event.LabMessageEvent;
|
|
|
import com.zd.laboratory.event.SensorNewStatusEvent;
|
|
import com.zd.laboratory.event.SensorNewStatusEvent;
|
|
|
import com.zd.laboratory.mqtt.service.TerminalRouter;
|
|
import com.zd.laboratory.mqtt.service.TerminalRouter;
|
|
|
|
|
+import com.zd.laboratory.mqtt.service.impl.CommonSend;
|
|
|
|
|
+import com.zd.laboratory.utils.FireLaborUtil;
|
|
|
import com.zd.laboratory.utils.HexUtils;
|
|
import com.zd.laboratory.utils.HexUtils;
|
|
|
|
|
+import com.zd.model.constant.CacheConstants;
|
|
|
import com.zd.model.constant.MqttConstants;
|
|
import com.zd.model.constant.MqttConstants;
|
|
|
|
|
+import com.zd.model.domain.R;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -26,7 +32,9 @@ import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
import org.springframework.messaging.*;
|
|
import org.springframework.messaging.*;
|
|
|
|
|
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -51,6 +59,15 @@ public class MqttConfig {
|
|
|
|
|
|
|
|
public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";
|
|
public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";
|
|
|
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ CommonSend commonSend;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ RedisService redisService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ RemoteFireDeviceService remoteFireDeviceService;
|
|
|
|
|
+
|
|
|
@Value("${mqtt.username:}")
|
|
@Value("${mqtt.username:}")
|
|
|
private String username;
|
|
private String username;
|
|
|
|
|
|
|
@@ -198,8 +215,8 @@ public class MqttConfig {
|
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
|
-// logger.info("[通道] - [{}]",receivedTopic);
|
|
|
|
|
-// logger.info("[消息] - [{}]",message.getPayload());
|
|
|
|
|
|
|
+ logger.info("[通道] - [{}]",receivedTopic);
|
|
|
|
|
+ logger.info("[消息] - [{}]",message.getPayload());
|
|
|
String messageStr = message.getPayload().toString();
|
|
String messageStr = message.getPayload().toString();
|
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
@@ -250,8 +267,34 @@ public class MqttConfig {
|
|
|
String relayCode = prefix[prefix.length - 1];
|
|
String relayCode = prefix[prefix.length - 1];
|
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ }}else if(receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)){
|
|
|
|
|
+ logger.info("灭火消息消费,receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
|
|
|
|
|
+ receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
|
|
+ //根据状态指令获取主机状态
|
|
|
|
|
+ JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
|
|
+ Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
|
|
+ //灭火主机发生预警
|
|
|
|
|
+ if (fireNumber > 0) {
|
|
|
|
|
+ logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
|
|
+ //发现灭火设备异常,发送消息通知
|
|
|
|
|
+ commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
|
|
|
|
|
+ if (fireNumber == 2) {
|
|
|
|
|
+ logger.info("=====>>>>>>发现自动灭火");
|
|
|
|
|
+ String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
|
|
+ //redis中是否已存在日志已添加标识
|
|
|
|
|
+ if(StringUtils.isEmpty(isExist)){
|
|
|
|
|
+ //自动灭火日志添加
|
|
|
|
|
+ R r = remoteFireDeviceService.addFireLog(receivedTopic);
|
|
|
|
|
+ //redis存值,灭火日志已经添加标识
|
|
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
|
|
|
|
|
+ logger.info("灭火日志添加执行结果," + r.getData());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+ //redis存值 采集器编号:主机状态json
|
|
|
|
|
+ redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
|
|
+ //redis存值 灭火主机响应时间
|
|
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TODO
|
|
// TODO
|