|
|
@@ -2,12 +2,15 @@ package com.zd.alg.mqtt;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.zd.alg.fire.service.IFireDeviceLogService;
|
|
|
-import com.zd.alg.fire.utils.FireDeviceStatusTask;
|
|
|
-import com.zd.alg.fire.utils.FireLaborUtil;
|
|
|
-import com.zd.common.core.redis.RedisService;
|
|
|
-import com.zd.model.constant.CacheConstants;
|
|
|
-import com.zd.model.constant.MqttConstants;
|
|
|
+import com.zd.alg.alarm.service.IAlarmLogService;
|
|
|
+import com.zd.alg.alarm.utils.AlarmUtil;
|
|
|
+import com.zd.algorithm.api.alarm.entity.AlarmEntrty;
|
|
|
+import com.zd.algorithm.api.alarm.entity.AlarmLog;
|
|
|
+import com.zd.algorithm.api.alarm.entity.Routes;
|
|
|
+import com.zd.algorithm.api.alarm.entity.SendTypes;
|
|
|
+import com.zd.common.core.utils.DateUtils;
|
|
|
+import nonapi.io.github.classgraph.json.JSONUtils;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
import org.slf4j.Logger;
|
|
|
@@ -26,9 +29,10 @@ import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
import org.springframework.messaging.*;
|
|
|
-
|
|
|
-import javax.annotation.Resource;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
/**
|
|
|
* mqtt配置
|
|
|
@@ -37,21 +41,16 @@ import java.util.concurrent.TimeUnit;
|
|
|
*/
|
|
|
@Configuration
|
|
|
public class MqttConfig {
|
|
|
- static Logger logger = LoggerFactory.getLogger(MqttConfig.class);
|
|
|
-
|
|
|
- @Resource
|
|
|
- private RedisService redisService;
|
|
|
|
|
|
- @Resource
|
|
|
- private MqttSend mqttSend;
|
|
|
+ @Autowired
|
|
|
+ private AlarmUtil alarmUtil;
|
|
|
+ @Autowired
|
|
|
+ private IAlarmLogService alarmLogService;
|
|
|
|
|
|
- @Resource
|
|
|
- private IFireDeviceLogService deviceLogService;
|
|
|
+ static Logger logger = LoggerFactory.getLogger(MqttConfig.class);
|
|
|
|
|
|
private static final byte[] WILL_DATA;
|
|
|
|
|
|
- private static String WILL_TOPIC = "alert";
|
|
|
-
|
|
|
static {
|
|
|
WILL_DATA = "offline".getBytes();
|
|
|
}
|
|
|
@@ -82,10 +81,10 @@ public class MqttConfig {
|
|
|
@Value("${mqtt.consumer.maxInflight}")
|
|
|
private Integer maxInflight;
|
|
|
|
|
|
+ private MqttPahoMessageDrivenChannelAdapter adapter;
|
|
|
+
|
|
|
/**
|
|
|
* 连接mqtt配置
|
|
|
- *
|
|
|
- * @return
|
|
|
*/
|
|
|
@Bean
|
|
|
public MqttConnectOptions mqttConnectOptions() {
|
|
|
@@ -93,17 +92,16 @@ public class MqttConfig {
|
|
|
// false,服务器会保留客户端的连接记录
|
|
|
// true,表示每次连接到服务器都以新的身份连接
|
|
|
options.setCleanSession(false);
|
|
|
- options.setAutomaticReconnect(true);
|
|
|
options.setUserName(username);
|
|
|
options.setPassword(password.toCharArray());
|
|
|
- options.setMaxInflight(maxInflight);
|
|
|
+// options.setMaxInflight(maxInflight);
|
|
|
options.setServerURIs(StringUtils.split(url, ","));
|
|
|
//超时时间 单位为秒
|
|
|
- options.setConnectionTimeout(60);
|
|
|
+ options.setConnectionTimeout(10);
|
|
|
//会话心跳时间 单位: s, 间隔时间:1.5*20秒向客户端发送心跳判断客户端是否在线
|
|
|
options.setKeepAliveInterval(60);
|
|
|
//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
|
|
|
- options.setWill(WILL_TOPIC, WILL_DATA, 2, false);
|
|
|
+// options.setWill("willTopic", WILL_DATA, 2, false);
|
|
|
return options;
|
|
|
}
|
|
|
|
|
|
@@ -128,8 +126,6 @@ public class MqttConfig {
|
|
|
|
|
|
/**
|
|
|
* 发送者消息处理
|
|
|
- *
|
|
|
- * @return
|
|
|
*/
|
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
|
|
|
@@ -144,13 +140,11 @@ public class MqttConfig {
|
|
|
|
|
|
/**
|
|
|
* 消息订阅
|
|
|
- *
|
|
|
- * @return
|
|
|
*/
|
|
|
@Bean
|
|
|
public MessageProducer inbound() {
|
|
|
// 可同时消费(订阅)多个Topic
|
|
|
- MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
|
|
|
+ adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
|
|
|
adapter.setCompletionTimeout(5000);
|
|
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
adapter.setQos(2);
|
|
|
@@ -162,8 +156,6 @@ public class MqttConfig {
|
|
|
|
|
|
/**
|
|
|
* 消费者消息通道
|
|
|
- *
|
|
|
- * @return
|
|
|
*/
|
|
|
@Bean(name = MQTT_INBOUND_CHANNEL)
|
|
|
public MessageChannel mqttInboundChannel() {
|
|
|
@@ -172,62 +164,74 @@ public class MqttConfig {
|
|
|
|
|
|
/**
|
|
|
* 消费者消息处理
|
|
|
- * 设备心跳
|
|
|
- *
|
|
|
- * @return
|
|
|
+ *设备心跳
|
|
|
*/
|
|
|
- //@Bean
|
|
|
- //@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
|
+ @Bean
|
|
|
+ @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
|
public MessageHandler mqttInbound() {
|
|
|
|
|
|
- return message -> {
|
|
|
- logger.info("=========>>> [消息头] - [{}]", message.getHeaders()+",[消息内容]: " + message.getPayload());
|
|
|
- MessageHeaders messageHeaders = message.getHeaders();
|
|
|
- String receivedTopic = (String) messageHeaders.get(MqttHeaders.TOPIC);
|
|
|
- String messageStr = message.getPayload().toString();
|
|
|
- /*if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
|
|
|
- logger.info("灭火设备消息消费,receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
|
|
|
- receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
- //根据状态指令获取主机状态
|
|
|
- JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
- Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
- //灭火主机发生预警
|
|
|
- if (fireNumber > 0) {
|
|
|
- logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
- //发现灭火设备异常,发送消息通知
|
|
|
- mqttSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
|
|
|
- //自动灭火
|
|
|
- if (fireNumber == 2) {
|
|
|
- //redis中是否已存在日志已添加标识
|
|
|
- String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
- if(StringUtils.isEmpty(isExist)){
|
|
|
- //自动灭火日志添加
|
|
|
- String str = deviceLogService.insertFireAutomatic(receivedTopic);
|
|
|
- //redis存值,灭火日志已经添加标识
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, str, 60L, TimeUnit.MINUTES);
|
|
|
- logger.info("自动灭火日志添加执行结果," + str);
|
|
|
+ return new MessageHandler() {
|
|
|
+ @Override
|
|
|
+ public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
+ logger.info("消息接口提醒: header"+message.getHeaders());
|
|
|
+ logger.info("消息接口提醒: Payload"+message.getPayload());
|
|
|
+ String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
|
|
|
+
|
|
|
+ logger.info("MQTT 接收 topic: {}",receivedTopic);
|
|
|
+
|
|
|
+ String msg = (String) message.getPayload();
|
|
|
+ logger.info("MQTT 接收 消息体: {}",msg);
|
|
|
+
|
|
|
+ try {
|
|
|
+ String type = SendTypes.All+"";
|
|
|
+ Map<String,Object> map = JSON.parseObject(msg, Map.class);
|
|
|
+
|
|
|
+ map = JSON.parseObject(map.get("data")+"", Map.class);
|
|
|
+
|
|
|
+ String text = (String) map.get("text");
|
|
|
+ List<String> list = JSON.parseArray(map.get("to")+"", String.class);
|
|
|
+ if(CollectionUtils.isEmpty(list)){
|
|
|
+ logger.error("接收告警消息手机号为空!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<String> set = new HashSet<>(list);
|
|
|
+ String[] to = new String[set.size()];
|
|
|
+ set.toArray(to);
|
|
|
+
|
|
|
+ String dataType = (String) map.get("type");
|
|
|
+ if(map.get("type") != null && ((SendTypes.All+"").equals(dataType)
|
|
|
+ || (SendTypes.Call+"").equals(dataType) || (SendTypes.SMS+"").equals(dataType))){
|
|
|
+ type = dataType;
|
|
|
+ }
|
|
|
+
|
|
|
+ AlarmEntrty alarmEntrty = new AlarmEntrty(Routes.NoticePush, to, type, text);
|
|
|
+
|
|
|
+ map = alarmUtil.sendPost(alarmEntrty);
|
|
|
+
|
|
|
+ String data = (String) map.get("Reply");
|
|
|
+ if("OK".equals(data)){
|
|
|
+ String[] phones = alarmEntrty.getTo();
|
|
|
+ for (String phone : phones) {
|
|
|
+ AlarmLog alarmLog = new AlarmLog();
|
|
|
+ alarmLog.setRemark("预案ID/实验室ID: " + receivedTopic.replace(defaultTopic, ""));
|
|
|
+ alarmLog.setIsBack(0);
|
|
|
+ alarmLog.setStatus("失败");
|
|
|
+ alarmLog.setPhone(phone);
|
|
|
+ alarmLog.setNotice(alarmEntrty.getText());
|
|
|
+ alarmLog.setCreateTime(DateUtils.getNowDate());
|
|
|
+
|
|
|
+ alarmLogService.insertAlarmLog(alarmLog);
|
|
|
}
|
|
|
+ }else {
|
|
|
+ logger.error("报警电话推送错误:" + JSONObject.toJSONString(map));
|
|
|
}
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ logger.error("报警处理数据异常:" + e.getMessage());
|
|
|
}
|
|
|
- //redis存值 采集器编号:主机状态json
|
|
|
- redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
- //redis存值 灭火主机响应时间
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
- }*/
|
|
|
- };
|
|
|
- }
|
|
|
|
|
|
- public static String byte2Hex(byte[] bytes) {
|
|
|
- StringBuilder builder = new StringBuilder();
|
|
|
- String temp;
|
|
|
- for (byte aByte : bytes) {
|
|
|
- temp = Integer.toHexString(aByte & 0xFF);
|
|
|
- if (temp.length() == 1) {
|
|
|
- //1得到一位的进行补0操作
|
|
|
- builder.append("0");
|
|
|
}
|
|
|
- builder.append(temp);
|
|
|
- }
|
|
|
- return builder.toString();
|
|
|
+ };
|
|
|
}
|
|
|
-}
|
|
|
+}
|