|
@@ -11,8 +11,8 @@ import com.zd.laboratory.event.LabMessageEvent;
|
|
|
import com.zd.laboratory.event.SensorNewStatusEvent;
|
|
import com.zd.laboratory.event.SensorNewStatusEvent;
|
|
|
import com.zd.laboratory.mqtt.service.TerminalRouter;
|
|
import com.zd.laboratory.mqtt.service.TerminalRouter;
|
|
|
import com.zd.laboratory.mqtt.service.impl.CommonSend;
|
|
import com.zd.laboratory.mqtt.service.impl.CommonSend;
|
|
|
|
|
+import com.zd.laboratory.utils.CRCCHECK;
|
|
|
import com.zd.laboratory.utils.FireLaborUtil;
|
|
import com.zd.laboratory.utils.FireLaborUtil;
|
|
|
-import com.zd.laboratory.utils.HexUtils;
|
|
|
|
|
import com.zd.model.constant.CacheConstants;
|
|
import com.zd.model.constant.CacheConstants;
|
|
|
import com.zd.model.constant.MqttConstants;
|
|
import com.zd.model.constant.MqttConstants;
|
|
|
import com.zd.model.domain.R;
|
|
import com.zd.model.domain.R;
|
|
@@ -35,7 +35,8 @@ import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
import org.springframework.messaging.*;
|
|
import org.springframework.messaging.*;
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
import javax.annotation.Resource;
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
|
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -202,8 +203,9 @@ public class MqttConfig {
|
|
|
@Bean
|
|
@Bean
|
|
|
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
|
|
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
|
|
|
public MessageHandler mqttOutbound() {
|
|
public MessageHandler mqttOutbound() {
|
|
|
|
|
+ int num = new Random().nextInt(998)+1;
|
|
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
|
|
- producerClientId,
|
|
|
|
|
|
|
+ producerClientId +"_"+ num,
|
|
|
mqttClientFactory());
|
|
mqttClientFactory());
|
|
|
messageHandler.setAsync(true);
|
|
messageHandler.setAsync(true);
|
|
|
messageHandler.setDefaultTopic(defaultTopic);
|
|
messageHandler.setDefaultTopic(defaultTopic);
|
|
@@ -217,8 +219,9 @@ public class MqttConfig {
|
|
|
*/
|
|
*/
|
|
|
@Bean
|
|
@Bean
|
|
|
public MessageProducer inbound() {
|
|
public MessageProducer inbound() {
|
|
|
|
|
+ int num = new Random().nextInt(98)+1;
|
|
|
// 可同时消费(订阅)多个Topic
|
|
// 可同时消费(订阅)多个Topic
|
|
|
- adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
|
|
|
|
|
|
|
+ adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId+num, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
|
|
|
adapter.setCompletionTimeout(5000);
|
|
adapter.setCompletionTimeout(5000);
|
|
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
adapter.setQos(2);
|
|
adapter.setQos(2);
|
|
@@ -279,13 +282,25 @@ public class MqttConfig {
|
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
MessageHeaders messageHeaders = message.getHeaders();
|
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
String receivedTopic = (String) messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
|
|
|
- //logger.info("[通道] - [{}]", receivedTopic);
|
|
|
|
|
- //logger.info("[消息] - [{}]", message.getPayload());
|
|
|
|
|
|
|
+// logger.info("[通道] - [{}],[{}]", receivedTopic,message.getPayload());
|
|
|
String messageStr = message.getPayload().toString();
|
|
String messageStr = message.getPayload().toString();
|
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
if (receivedTopic.startsWith(devicePrefix)) {
|
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
if (receivedTopic.contains("788D4C6C6187ABC")) {
|
|
|
logger.info("[原始消息] - [{}]", message.getPayload());
|
|
logger.info("[原始消息] - [{}]", message.getPayload());
|
|
|
}
|
|
}
|
|
|
|
|
+ if ((message.getPayload() instanceof byte[])) {
|
|
|
|
|
+ messageStr = "";
|
|
|
|
|
+ try {
|
|
|
|
|
+ ByteArrayInputStream bis = new ByteArrayInputStream((byte[]) message.getPayload());
|
|
|
|
|
+ byte[] buffer = new byte[1024];
|
|
|
|
|
+ int len = -1;
|
|
|
|
|
+ while((len = bis.read(buffer)) != -1){
|
|
|
|
|
+ messageStr+= new String(buffer);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ logger.error("传感器数据转换异常!");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
String topicEnd = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1);
|
|
String topicEnd = receivedTopic.substring(receivedTopic.lastIndexOf("/") + 1);
|
|
|
JSONObject jsonObject = JSONObject.parseObject(messageStr);
|
|
JSONObject jsonObject = JSONObject.parseObject(messageStr);
|
|
|
jsonObject.put("deviceNo", topicEnd);
|
|
jsonObject.put("deviceNo", topicEnd);
|
|
@@ -303,10 +318,18 @@ public class MqttConfig {
|
|
|
TerminalRouter.routerMap.get("HxpTerminalService").offLine(codeNum);
|
|
TerminalRouter.routerMap.get("HxpTerminalService").offLine(codeNum);
|
|
|
} else if (receivedTopic.startsWith(steerPublishPrefix)) {
|
|
} else if (receivedTopic.startsWith(steerPublishPrefix)) {
|
|
|
// messageStr = byte2Hex(message.getPayload().toString().getBytes(StandardCharsets.UTF_8));
|
|
// messageStr = byte2Hex(message.getPayload().toString().getBytes(StandardCharsets.UTF_8));
|
|
|
- char[] str = message.getPayload().toString().toCharArray();
|
|
|
|
|
- logger.info("柜锁mqtt消费:" + messageStr);
|
|
|
|
|
- String replaceMess = messageStr.replace(" ", "");
|
|
|
|
|
|
|
+// char[] str = message.getPayload().toString().toCharArray();
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ // 柜锁bit 位
|
|
|
|
|
+ long bit = CRCCHECK.getBitByCommand(messageStr);
|
|
|
|
|
+ // 1开锁状态 0关锁状态
|
|
|
|
|
+ int status = CRCCHECK.getLockStatus(messageStr);
|
|
|
|
|
+ logger.info("柜锁MQTT状态回调:" + receivedTopic.replaceAll(steerPublishPrefix +"/", "") + ":" + bit + ",回调结果" + (status == 1 ? "开启": "关闭") + ",指令:" + messageStr);
|
|
|
|
|
+ redisService.setCacheObject(receivedTopic.replaceAll(steerPublishPrefix +"/", "") + ":" + bit, status, 3 * 60L, TimeUnit.SECONDS);
|
|
|
|
|
+
|
|
|
//坨机写指令返回数据
|
|
//坨机写指令返回数据
|
|
|
|
|
+ /*String replaceMess = messageStr.replace(" ", "");
|
|
|
if (replaceMess.length() == 12) {
|
|
if (replaceMess.length() == 12) {
|
|
|
String command = replaceMess.substring(8, 10);
|
|
String command = replaceMess.substring(8, 10);
|
|
|
//表示坨机成功
|
|
//表示坨机成功
|
|
@@ -331,7 +354,7 @@ public class MqttConfig {
|
|
|
String relayCode = prefix[prefix.length - 1];
|
|
String relayCode = prefix[prefix.length - 1];
|
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
TerminalRouter.routerMap.get("HxpLockService").offLine(relayCode);
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
+ }*/
|
|
|
} else if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
|
|
} else if (receivedTopic.startsWith(MqttConstants.TOPIC_FIRE_DEVICE_RECEIVE)) {
|
|
|
logger.info("消息处理器1开始处理=====");
|
|
logger.info("消息处理器1开始处理=====");
|
|
|
logger.info("topic:" + receivedTopic);
|
|
logger.info("topic:" + receivedTopic);
|