|
@@ -163,6 +163,7 @@ public class MqttConfig {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 通道2
|
|
* 通道2
|
|
|
|
|
+ *
|
|
|
* @return
|
|
* @return
|
|
|
*/
|
|
*/
|
|
|
@Bean(name = MQTT_OUTBOUND_CHANNEL)
|
|
@Bean(name = MQTT_OUTBOUND_CHANNEL)
|
|
@@ -172,6 +173,7 @@ public class MqttConfig {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 配置client2,监听的willTopic
|
|
* 配置client2,监听的willTopic
|
|
|
|
|
+ *
|
|
|
* @return
|
|
* @return
|
|
|
*/
|
|
*/
|
|
|
@Bean
|
|
@Bean
|
|
@@ -238,80 +240,30 @@ public class MqttConfig {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 通过通道2获取数据
|
|
* 通过通道2获取数据
|
|
|
|
|
+ *
|
|
|
* @return
|
|
* @return
|
|
|
*/
|
|
*/
|
|
|
@Bean
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
|
- public MessageHandler handlerByte() {
|
|
|
|
|
- logger.info("=======进入消息处理器2=====");
|
|
|
|
|
|
|
+ public MessageHandler handlerByte() {
|
|
|
return message -> {
|
|
return message -> {
|
|
|
String receivedTopic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
|
String receivedTopic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
|
|
-
|
|
|
|
|
- try{
|
|
|
|
|
- if(!(message.getPayload() instanceof byte[])){
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!(message.getPayload() instanceof byte[])) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
- logger.info("topic:"+receivedTopic);
|
|
|
|
|
- if(receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)){
|
|
|
|
|
-
|
|
|
|
|
- logger.info("消息处理器2:message:"+message.getPayload().toString());
|
|
|
|
|
- //String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
|
|
|
|
|
- String messageStr = bytes2HexString((byte[])message.getPayload());
|
|
|
|
|
- logger.info("灭火消息,主机回复指令:"+messageStr);
|
|
|
|
|
- //返回非指令时不向下执行
|
|
|
|
|
- if(messageStr!=null && messageStr.length()<30){
|
|
|
|
|
- logger.info("灭火消息指令不正确");
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- //获取采集器编号
|
|
|
|
|
- receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
|
|
- //根据状态指令获取主机状态
|
|
|
|
|
- JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
|
|
- logger.info("灭火主机状态:"+jsonObject+"采集器编号"+receivedTopic+",响应时间key:"+CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic);
|
|
|
|
|
- //redis存值 采集器编号:主机状态json
|
|
|
|
|
- redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
|
|
- //redis存值 灭火主机响应时间
|
|
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
|
|
- logger.info("灭火消息,redis存值完毕========");
|
|
|
|
|
-
|
|
|
|
|
- Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
|
|
- //灭火主机发生预警
|
|
|
|
|
- if (fireNumber > 0) {
|
|
|
|
|
- logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
|
|
- //发现灭火设备异常,发送消息通知小程序和PC
|
|
|
|
|
- commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
|
|
|
|
|
- if (fireNumber == 2) {
|
|
|
|
|
- logger.info("=====>>>>>>发现自动灭火");
|
|
|
|
|
- //通知预案
|
|
|
|
|
- //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
|
|
|
|
|
- OneClickFireDTO dto = new OneClickFireDTO();
|
|
|
|
|
- dto.setDeviceCode(receivedTopic);
|
|
|
|
|
- dto.setIfFire(1);
|
|
|
|
|
- dto.setAutomanual(1);
|
|
|
|
|
- remoteLaboratoryService.oneClickFire(dto);
|
|
|
|
|
-
|
|
|
|
|
- String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
|
|
- //redis中是否已存在日志已添加标识
|
|
|
|
|
- if(StringUtils.isEmpty(isExist)){
|
|
|
|
|
- //自动灭火日志添加
|
|
|
|
|
- R r = remoteFireDeviceService.addFireLog(receivedTopic);
|
|
|
|
|
- //redis存值,灭火日志已经添加标识
|
|
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
|
|
|
|
|
- logger.info("灭火日志添加执行结果," + r.getData());
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
|
|
|
|
|
+ logger.info("消息处理器2开始处理=====");
|
|
|
|
|
+ logger.info("topic:" + receivedTopic);
|
|
|
|
|
+ ReturnMessageProcessing(message,receivedTopic);
|
|
|
}
|
|
}
|
|
|
- }catch (Exception e){
|
|
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
|
/**
|
|
/**
|
|
|
* 消费者消息处理
|
|
* 消费者消息处理
|
|
|
* 设备心跳
|
|
* 设备心跳
|
|
@@ -327,8 +279,8 @@ public class MqttConfig {
|
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
|
- logger.info("[通道] - [{}]",receivedTopic);
|
|
|
|
|
- logger.info("[消息] - [{}]",message.getPayload());
|
|
|
|
|
|
|
+ logger.info("[通道] - [{}]", receivedTopic);
|
|
|
|
|
+ logger.info("[消息] - [{}]", message.getPayload());
|
|
|
String messageStr = message.getPayload().toString();
|
|
String messageStr = message.getPayload().toString();
|
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
@@ -379,7 +331,12 @@ public class MqttConfig {
|
|
|
String relayCode = prefix[prefix.length - 1];
|
|
String relayCode = prefix[prefix.length - 1];
|
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
|
}
|
|
}
|
|
|
- }}
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ }else if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
|
|
|
|
|
+ logger.info("消息处理器1开始处理=====");
|
|
|
|
|
+ logger.info("topic:" + receivedTopic);
|
|
|
|
|
+ ReturnMessageProcessing(message,receivedTopic);
|
|
|
|
|
+ }
|
|
|
// TODO
|
|
// TODO
|
|
|
// MessageBody messageBody = JSONObject.parseObject(messageStr, MessageBody.class);
|
|
// MessageBody messageBody = JSONObject.parseObject(messageStr, MessageBody.class);
|
|
|
// mqttResHandler.deal(JSONUtil.toBean(msg,com.ffy.mqtt.model.Message.class));
|
|
// mqttResHandler.deal(JSONUtil.toBean(msg,com.ffy.mqtt.model.Message.class));
|
|
@@ -391,8 +348,65 @@ public class MqttConfig {
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /***
|
|
|
|
|
+ * 灭火主机返回消息处理
|
|
|
|
|
+ * @param message
|
|
|
|
|
+ * @param receivedTopic
|
|
|
|
|
+ */
|
|
|
|
|
+ public void ReturnMessageProcessing(Message<?> message, String receivedTopic) {
|
|
|
|
|
+ logger.info("灭火消息处理器:" + message.getPayload().toString());
|
|
|
|
|
+ //String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
|
|
|
|
|
+ String messageStr = bytes2HexString((byte[]) message.getPayload());
|
|
|
|
|
+ logger.info("灭火消息,主机回复指令:" + messageStr);
|
|
|
|
|
+ //返回非指令时不向下执行
|
|
|
|
|
+ if (messageStr != null && messageStr.length() < 30) {
|
|
|
|
|
+ logger.info("灭火消息指令不正确");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //获取采集器编号
|
|
|
|
|
+ receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
|
|
+ //根据状态指令获取主机状态
|
|
|
|
|
+ JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
|
|
+ logger.info("灭火主机状态:" + jsonObject + "采集器编号" + receivedTopic + ",响应时间key:" + CacheConstants.FIRE_DEVICE_RESPOND_TIME + receivedTopic);
|
|
|
|
|
+ //redis存值 采集器编号:主机状态json
|
|
|
|
|
+ redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
|
|
+ //redis存值 灭火主机响应时间
|
|
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME + receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
|
|
+ logger.info("灭火消息,redis存值完毕========");
|
|
|
|
|
+
|
|
|
|
|
+ Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
|
|
+ //灭火主机发生预警
|
|
|
|
|
+ if (fireNumber > 0) {
|
|
|
|
|
+ logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
|
|
+ //发现灭火设备异常,发送消息通知小程序和PC
|
|
|
|
|
+ commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN + receivedTopic, jsonObject.toJSONString());
|
|
|
|
|
+ if (fireNumber == 2) {
|
|
|
|
|
+ logger.info("=====>>>>>>发现自动灭火");
|
|
|
|
|
+ //通知预案
|
|
|
|
|
+ //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
|
|
|
|
|
+ OneClickFireDTO dto = new OneClickFireDTO();
|
|
|
|
|
+ dto.setDeviceCode(receivedTopic);
|
|
|
|
|
+ dto.setIfFire(1);
|
|
|
|
|
+ dto.setAutomanual(1);
|
|
|
|
|
+ remoteLaboratoryService.oneClickFire(dto);
|
|
|
|
|
+
|
|
|
|
|
+ String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
|
|
+ //redis中是否已存在日志已添加标识
|
|
|
|
|
+ if (StringUtils.isEmpty(isExist)) {
|
|
|
|
|
+ //自动灭火日志添加
|
|
|
|
|
+ R r = remoteFireDeviceService.addFireLog(receivedTopic);
|
|
|
|
|
+ //redis存值,灭火日志已经添加标识
|
|
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
|
|
|
|
|
+ logger.info("灭火日志添加执行结果," + r.getData());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* byte 数组转string
|
|
* byte 数组转string
|
|
|
|
|
+ *
|
|
|
* @param b
|
|
* @param b
|
|
|
* @return
|
|
* @return
|
|
|
*/
|
|
*/
|
|
@@ -404,14 +418,14 @@ public class MqttConfig {
|
|
|
if (hex.length() == 1) {
|
|
if (hex.length() == 1) {
|
|
|
hex = '0' + hex;
|
|
hex = '0' + hex;
|
|
|
}
|
|
}
|
|
|
- r += hex.toUpperCase()+" ";
|
|
|
|
|
|
|
+ r += hex.toUpperCase() + " ";
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return r;
|
|
return r;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public static String byte2Hex(byte[] bytes) {
|
|
public static String byte2Hex(byte[] bytes) {
|
|
|
- logger.info("byte长度:"+bytes.length);
|
|
|
|
|
|
|
+ logger.info("byte长度:" + bytes.length);
|
|
|
StringBuilder builder = new StringBuilder();
|
|
StringBuilder builder = new StringBuilder();
|
|
|
String temp;
|
|
String temp;
|
|
|
for (byte aByte : bytes) {
|
|
for (byte aByte : bytes) {
|