|
|
@@ -5,6 +5,8 @@ import com.alibaba.fastjson.JSONObject;
|
|
|
import com.zd.algorithm.api.fire.RemoteFireDeviceService;
|
|
|
import com.zd.common.core.redis.RedisService;
|
|
|
import com.zd.common.core.utils.SpringUtils;
|
|
|
+import com.zd.laboratory.api.entity.OneClickFireDTO;
|
|
|
+import com.zd.laboratory.api.feign.RemoteLaboratoryService;
|
|
|
import com.zd.laboratory.event.LabMessageEvent;
|
|
|
import com.zd.laboratory.event.SensorNewStatusEvent;
|
|
|
import com.zd.laboratory.mqtt.service.TerminalRouter;
|
|
|
@@ -68,6 +70,8 @@ public class MqttConfig {
|
|
|
|
|
|
@Resource
|
|
|
RemoteFireDeviceService remoteFireDeviceService;
|
|
|
+ @Resource
|
|
|
+ RemoteLaboratoryService remoteLaboratoryService;
|
|
|
|
|
|
@Value("${mqtt.username:}")
|
|
|
private String username;
|
|
|
@@ -158,6 +162,37 @@ public class MqttConfig {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 通道2
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Bean(name = MQTT_OUTBOUND_CHANNEL)
|
|
|
+ public MessageChannel mqttInputChannelTwo() {
|
|
|
+ return new DirectChannel();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 配置client2,监听的willTopic
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public MessageProducer inboundByte() {
|
|
|
+ // 可同时消费(订阅)多个Topic
|
|
|
+ adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
|
|
|
+ adapter.setCompletionTimeout(5000);
|
|
|
+// adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
+ adapter.setQos(2);
|
|
|
+
|
|
|
+ // 设置转换器,接收bytes
|
|
|
+ DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
|
|
|
+ converter.setPayloadAsBytes(true);
|
|
|
+ adapter.setConverter(converter);
|
|
|
+
|
|
|
+ // 设置订阅通道
|
|
|
+ adapter.setOutputChannel(mqttInboundChannel());
|
|
|
+ return adapter;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* 发送者消息处理
|
|
|
*
|
|
|
* @return
|
|
|
@@ -202,6 +237,81 @@ public class MqttConfig {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 通过通道2获取数据
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
|
|
|
+ public MessageHandler handlerByte() {
|
|
|
+ logger.info("=======进入消息处理器2=====");
|
|
|
+ return message -> {
|
|
|
+ String receivedTopic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
|
|
+
|
|
|
+ try{
|
|
|
+ if(!(message.getPayload() instanceof byte[])){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ logger.info("topic:"+receivedTopic);
|
|
|
+ if(receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)){
|
|
|
+
|
|
|
+ logger.info("消息处理器2:message:"+message.getPayload().toString());
|
|
|
+ //String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
|
|
|
+ String messageStr = bytes2HexString((byte[])message.getPayload());
|
|
|
+ logger.info("灭火消息,主机回复指令:"+messageStr);
|
|
|
+ //返回非指令时不向下执行
|
|
|
+ if(messageStr!=null && messageStr.length()<30){
|
|
|
+ logger.info("灭火消息指令不正确");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ //获取采集器编号
|
|
|
+ receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
+ //根据状态指令获取主机状态
|
|
|
+ JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
+ logger.info("灭火主机状态:"+jsonObject+"采集器编号"+receivedTopic+",响应时间key:"+CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic);
|
|
|
+ //redis存值 采集器编号:主机状态json
|
|
|
+ redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
+ //redis存值 灭火主机响应时间
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
+ logger.info("灭火消息,redis存值完毕========");
|
|
|
+
|
|
|
+ Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
+ //灭火主机发生预警
|
|
|
+ if (fireNumber > 0) {
|
|
|
+ logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
+ //发现灭火设备异常,发送消息通知小程序和PC
|
|
|
+ commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
|
|
|
+ if (fireNumber == 2) {
|
|
|
+ logger.info("=====>>>>>>发现自动灭火");
|
|
|
+ //通知预案
|
|
|
+ //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
|
|
|
+ OneClickFireDTO dto = new OneClickFireDTO();
|
|
|
+ dto.setDeviceCode(receivedTopic);
|
|
|
+ dto.setIfFire(fireNumber);
|
|
|
+ remoteLaboratoryService.oneClickFire(dto);
|
|
|
+
|
|
|
+ String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
+ //redis中是否已存在日志已添加标识
|
|
|
+ if(StringUtils.isEmpty(isExist)){
|
|
|
+ //自动灭火日志添加
|
|
|
+ R r = remoteFireDeviceService.addFireLog(receivedTopic);
|
|
|
+ //redis存值,灭火日志已经添加标识
|
|
|
+ redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
|
|
|
+ logger.info("灭火日志添加执行结果," + r.getData());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
* 消费者消息处理
|
|
|
* 设备心跳
|
|
|
*
|
|
|
@@ -268,49 +378,7 @@ public class MqttConfig {
|
|
|
String relayCode = prefix[prefix.length - 1];
|
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
|
}
|
|
|
- }}else if(receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)){
|
|
|
- String messageStrTest = byte2Hex(message.getPayload().toString().getBytes(StandardCharsets.UTF_8));
|
|
|
- logger.info("灭火消息返回,"+messageStrTest);
|
|
|
- logger.info("灭火消息消费,receivedTopic:" + receivedTopic + " messageStr:" + messageStr);
|
|
|
- //返回非指令时不向下执行
|
|
|
- if(receivedTopic!=null && receivedTopic.length()<30){
|
|
|
- logger.info("灭火消息指令不正确");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- //获取采集器编号
|
|
|
- receivedTopic = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1, receivedTopic.length());
|
|
|
- //根据状态指令获取主机状态
|
|
|
- JSONObject jsonObject = FireLaborUtil.getFireStatus(messageStr);
|
|
|
- //redis存值 采集器编号:主机状态json
|
|
|
- redisService.setCacheObject(receivedTopic, jsonObject, 1L, TimeUnit.MINUTES);
|
|
|
- //redis存值 灭火主机响应时间
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_DEVICE_RESPOND_TIME+receivedTopic, System.currentTimeMillis(), 1L, TimeUnit.MINUTES);
|
|
|
- logger.info("灭火消息,redis存值完毕========");
|
|
|
-
|
|
|
- Integer fireNumber = Integer.valueOf(jsonObject.get("fireNumber").toString());
|
|
|
- //灭火主机发生预警
|
|
|
- if (fireNumber > 0) {
|
|
|
- logger.info("灭火设备传感器异常,通知预案,状态:" + jsonObject.toJSONString());
|
|
|
- //发现灭火设备异常,发送消息通知小程序和PC
|
|
|
- commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_WARN+receivedTopic,jsonObject.toJSONString());
|
|
|
- if (fireNumber == 2) {
|
|
|
- logger.info("=====>>>>>>发现自动灭火");
|
|
|
- //通知预案
|
|
|
- //commonSend.send(MqttConstants.TOPIC_FIRE_DEVICE_CATCHFIRE+receivedTopic,jsonObject.toJSONString());
|
|
|
- String isExist = redisService.getCacheObject(CacheConstants.FIRE_LOG_IS_EXIST);
|
|
|
- //redis中是否已存在日志已添加标识
|
|
|
- if(StringUtils.isEmpty(isExist)){
|
|
|
- //自动灭火日志添加
|
|
|
- R r = remoteFireDeviceService.addFireLog(receivedTopic);
|
|
|
- //redis存值,灭火日志已经添加标识
|
|
|
- redisService.setCacheObject(CacheConstants.FIRE_LOG_IS_EXIST, r.getData(), 60L, TimeUnit.MINUTES);
|
|
|
- logger.info("灭火日志添加执行结果," + r.getData());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ }}
|
|
|
// TODO
|
|
|
// MessageBody messageBody = JSONObject.parseObject(messageStr, MessageBody.class);
|
|
|
// mqttResHandler.deal(JSONUtil.toBean(msg,com.ffy.mqtt.model.Message.class));
|
|
|
@@ -322,6 +390,25 @@ public class MqttConfig {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * byte 数组转string
|
|
|
+ * @param b
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public static String bytes2HexString(byte[] b) {
|
|
|
+ String r = "";
|
|
|
+
|
|
|
+ for (int i = 0; i < b.length; i++) {
|
|
|
+ String hex = Integer.toHexString(b[i] & 0xFF);
|
|
|
+ if (hex.length() == 1) {
|
|
|
+ hex = '0' + hex;
|
|
|
+ }
|
|
|
+ r += hex.toUpperCase()+" ";
|
|
|
+ }
|
|
|
+
|
|
|
+ return r;
|
|
|
+ }
|
|
|
+
|
|
|
public static String byte2Hex(byte[] bytes) {
|
|
|
logger.info("byte长度:"+bytes.length);
|
|
|
StringBuilder builder = new StringBuilder();
|