|
|
@@ -1,7 +1,6 @@
|
|
|
package com.zd.alg.mqtt;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
import com.zd.alg.alarm.service.IAlarmLogService;
|
|
|
import com.zd.alg.alarm.utils.AlarmUtil;
|
|
|
import com.zd.algorithm.api.alarm.entity.AlarmEntrty;
|
|
|
@@ -9,7 +8,6 @@ import com.zd.algorithm.api.alarm.entity.AlarmLog;
|
|
|
import com.zd.algorithm.api.alarm.entity.Routes;
|
|
|
import com.zd.algorithm.api.alarm.entity.SendTypes;
|
|
|
import com.zd.common.core.utils.DateUtils;
|
|
|
-import nonapi.io.github.classgraph.json.JSONUtils;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
@@ -28,7 +26,11 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel
|
|
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
-import org.springframework.messaging.*;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.messaging.MessageChannel;
|
|
|
+import org.springframework.messaging.MessageHandler;
|
|
|
+import org.springframework.messaging.MessagingException;
|
|
|
+
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
@@ -130,9 +132,7 @@ public class MqttConfig {
|
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
|
|
|
public MessageHandler mqttOutbound() {
|
|
|
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
|
|
- producerClientId,
|
|
|
- mqttClientFactory());
|
|
|
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId, mqttClientFactory());
|
|
|
messageHandler.setAsync(true);
|
|
|
messageHandler.setDefaultTopic(defaultTopic);
|
|
|
return messageHandler;
|
|
|
@@ -164,33 +164,24 @@ public class MqttConfig {
|
|
|
|
|
|
/**
|
|
|
* 消费者消息处理
|
|
|
- *设备心跳
|
|
|
+ * 设备心跳
|
|
|
*/
|
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
|
public MessageHandler mqttInbound() {
|
|
|
-
|
|
|
return new MessageHandler() {
|
|
|
@Override
|
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
- logger.info("消息接口提醒: header"+message.getHeaders());
|
|
|
- logger.info("消息接口提醒: Payload"+message.getPayload());
|
|
|
+ logger.info("预警消息接收内容: message={}", message);
|
|
|
String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
|
|
|
-
|
|
|
- logger.info("MQTT 接收 topic: {}",receivedTopic);
|
|
|
-
|
|
|
String msg = (String) message.getPayload();
|
|
|
- logger.info("MQTT 接收 消息体: {}",msg);
|
|
|
-
|
|
|
try {
|
|
|
- String type = SendTypes.All+"";
|
|
|
- Map<String,Object> map = JSON.parseObject(msg, Map.class);
|
|
|
-
|
|
|
- map = JSON.parseObject(map.get("data")+"", Map.class);
|
|
|
-
|
|
|
+ String type = SendTypes.All + "";
|
|
|
+ Map<String, Object> map = JSON.parseObject(msg, Map.class);
|
|
|
+ map = JSON.parseObject(map.get("data") + "", Map.class);
|
|
|
String text = (String) map.get("text");
|
|
|
- List<String> list = JSON.parseArray(map.get("to")+"", String.class);
|
|
|
- if(CollectionUtils.isEmpty(list)){
|
|
|
+ List<String> list = JSON.parseArray(map.get("to") + "", String.class);
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
logger.error("接收告警消息手机号为空!");
|
|
|
return;
|
|
|
}
|
|
|
@@ -198,19 +189,15 @@ public class MqttConfig {
|
|
|
Set<String> set = new HashSet<>(list);
|
|
|
String[] to = new String[set.size()];
|
|
|
set.toArray(to);
|
|
|
-
|
|
|
String dataType = (String) map.get("type");
|
|
|
- if(map.get("type") != null && ((SendTypes.All+"").equals(dataType)
|
|
|
- || (SendTypes.Call+"").equals(dataType) || (SendTypes.SMS+"").equals(dataType))){
|
|
|
+ if (map.get("type") != null && ((SendTypes.All + "").equals(dataType) || (SendTypes.Call + "").equals(dataType) || (SendTypes.SMS + "").equals(dataType))) {
|
|
|
type = dataType;
|
|
|
}
|
|
|
|
|
|
AlarmEntrty alarmEntrty = new AlarmEntrty(Routes.NoticePush, to, type, text);
|
|
|
-
|
|
|
map = alarmUtil.sendPost(alarmEntrty);
|
|
|
-
|
|
|
String data = (String) map.get("Reply");
|
|
|
- if("OK".equals(data)){
|
|
|
+ if ("OK".equals(data)) {
|
|
|
String[] phones = alarmEntrty.getTo();
|
|
|
for (String phone : phones) {
|
|
|
AlarmLog alarmLog = new AlarmLog();
|
|
|
@@ -220,17 +207,14 @@ public class MqttConfig {
|
|
|
alarmLog.setPhone(phone);
|
|
|
alarmLog.setNotice(alarmEntrty.getText());
|
|
|
alarmLog.setCreateTime(DateUtils.getNowDate());
|
|
|
-
|
|
|
alarmLogService.insertAlarmLog(alarmLog);
|
|
|
}
|
|
|
- }else {
|
|
|
- logger.error("报警电话推送错误:" + JSONObject.toJSONString(map));
|
|
|
+ } else {
|
|
|
+ logger.error("报警电话推送错误:" + JSON.toJSONString(map));
|
|
|
}
|
|
|
- }catch (Exception e){
|
|
|
- e.printStackTrace();
|
|
|
- logger.error("报警处理数据异常:" + e.getMessage());
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("报警处理数据异常:{}", e);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
};
|
|
|
}
|