Преглед изворни кода

MQTT ClientId重复报错断链处理及优化

linfutong пре 2 година
родитељ
комит
c6c14e9242

+ 4 - 10
zd-modules/zd-algorithm/src/main/java/com/zd/alg/forward/serivce/FireImageService.java

@@ -63,23 +63,17 @@ public class FireImageService {
     public void camera(){
         try {
             String streamUrl = fireProperties.getStreamUrl();
-            if (streamUrl == null) {
-                log.error("=========调用产生异常:未配置流媒体地址============");
-                return;
+            if (streamUrl != null && !"".equals(streamUrl)) {
+                catchImage(streamUrl);
             }
-            catchImage();
+            //log.error("=========调用产生异常:未配置流媒体地址============");
         } catch (ServiceException | IOException e) {
             //异常回调,防止系统因异常问题被杀死
             log.error("=========调用产生异常:{}============", e.getMessage());
         }
     }
 
-    public void catchImage() throws IOException {
-        String streamUrl = fireProperties.getStreamUrl();
-        if (streamUrl == null) {
-            throw new ServiceException("未配置流媒体地址");
-        }
-
+    public void catchImage(String streamUrl) throws IOException {
         try (FFmpegFrameGrabber grabber = VideoUtils.createGrabber(streamUrl)) {
             grabber.start();
             String fileName = "test";

+ 5 - 6
zd-modules/zd-algorithm/src/main/java/com/zd/alg/mqtt/MqttConfig.java

@@ -31,10 +31,7 @@ import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.MessagingException;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 /**
  * mqtt配置
@@ -132,7 +129,8 @@ public class MqttConfig {
     @Bean
     @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
     public MessageHandler mqttOutbound() {
-        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId, mqttClientFactory());
+        int num = new Random().nextInt(998)+1;
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId+"_"+num, mqttClientFactory());
         messageHandler.setAsync(true);
         messageHandler.setDefaultTopic(defaultTopic);
         return messageHandler;
@@ -144,7 +142,8 @@ public class MqttConfig {
     @Bean
     public MessageProducer inbound() {
         // 可同时消费(订阅)多个Topic
-        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
+        int num = new Random().nextInt(98)+1;
+        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId+num, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(2);

+ 0 - 57
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/mqtt/TestController.java

@@ -1,57 +0,0 @@
-package com.zd.laboratory.mqtt;
-
-import com.zd.laboratory.mqtt.enums.BigViewDataType;
-import com.zd.laboratory.mqtt.service.impl.SubMessageSendManager;
-import io.swagger.annotations.Api;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * @author zpp
- * @date 2019/8/26 18:19
- */
-@RestController
-@ResponseBody
-@Api(tags = "mqtt测试")
-@RequestMapping("/mqtt")
-public class TestController {
-
-    @Autowired
-    private SubMessageSendManager messageSendService;
-
-    @GetMapping("/send")
-    public String send(){
-        messageSendService.SendBigViewUpdate(BigViewDataType.SUB_ONLINE);
-        return "success";
-    }
-
-//    @GetMapping("/sendry/{msg}")
-//    public String sends(@PathVariable String msg){
-//        String topic="reming/heartssss";
-//        JSONObject jsonObject=new JSONObject();
-//        jsonObject.put("name","张三");
-//        jsonObject.put("sex","男");
-//        jsonObject.put("phone","13324654152");
-//        mqttProducer.send(topic,0,jsonObject.toJSONString());
-//        return "success";
-//    }
-
-
-
-    //Integer trackX,Integer trackY,String macNo
-
-//    @GetMapping("/pay")
-//    public String pay(@RequestParam (value = "orderNo") String orderNo,
-//                      @RequestParam (value = "trackX") Integer trackX,
-//                      @RequestParam (value = "trackY") Integer trackY,
-//                      @RequestParam (value = "macNo") String macNo,
-//                      @RequestParam (value = "type") Integer type,
-//                       @RequestParam (value = "number") Integer number
-//    ){
-//        messageSendService.SendPaySuccess(orderNo,trackX,trackY,macNo,type,number);
-//        return "success";
-//    }
-}

+ 4 - 2
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/mqtt/config/MqttConfig.java

@@ -202,8 +202,9 @@ public class MqttConfig {
     @Bean
     @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
     public MessageHandler mqttOutbound() {
+        int num = new Random().nextInt(998)+1;
         MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
-                producerClientId,
+                producerClientId +"_"+ num,
                 mqttClientFactory());
         messageHandler.setAsync(true);
         messageHandler.setDefaultTopic(defaultTopic);
@@ -217,8 +218,9 @@ public class MqttConfig {
      */
     @Bean
     public MessageProducer inbound() {
+        int num = new Random().nextInt(98)+1;
         // 可同时消费(订阅)多个Topic
-        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
+        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId+num, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(2);

+ 0 - 4
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/mqtt/controller/MqttController.java

@@ -1,16 +1,12 @@
 package com.zd.laboratory.mqtt.controller;
 
-import com.alibaba.fastjson.JSONObject;
 import com.zd.model.enums.HardwareOperate;
 import com.zd.common.core.utils.SpringUtils;
 import com.zd.model.domain.ResultData;
-import com.zd.laboratory.domain.LabHardware;
-import com.zd.laboratory.event.RelayHardwareStatusEvent;
 import com.zd.laboratory.event.VideoHardwareStatusEvent;
 import com.zd.laboratory.mapper.LabHardwareMapper;
 import com.zd.laboratory.mqtt.MqttProducer;
 import com.zd.laboratory.mqtt.entiy.EquipmentStatus;
-import com.zd.laboratory.service.ILabHardwareService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 

+ 0 - 36
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/mqtt/test/MqttTest.java

@@ -1,36 +0,0 @@
-package com.zd.laboratory.mqtt.test;
-
-import com.zd.laboratory.config.HardwareFunctionStatusConfig;
-import com.zd.laboratory.mqtt.service.impl.SubMessageSendManager;
-import org.springframework.boot.context.event.ApplicationStartedEvent;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.annotation.Profile;
-import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
-import org.springframework.stereotype.Component;
-
-/**
- * @Author: zhoupan
- * @Date: 2021/10/08/13:09
- * @Description:
- */
-
-@Profile("dev")
-@Component
-public class MqttTest implements ApplicationListener<ApplicationStartedEvent> {
-
-
-
-    @Override
-    public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
-//        MqttPahoMessageDrivenChannelAdapter messageProducer = (MqttPahoMessageDrivenChannelAdapter)applicationStartedEvent.getApplicationContext().getBeanFactory().getBean("inbound");
-//        //添加主题
-//        messageProducer.addTopic(MqttConstants.LAB_FUNCTION_DATA+-1L);
-//
-//        SubMessageSendManager bean = applicationStartedEvent.getApplicationContext().getBeanFactory().getBean(SubMessageSendManager.class);
-//        HardwareFunctionStatusConfig function = applicationStartedEvent.getApplicationContext().getBeanFactory().getBean(HardwareFunctionStatusConfig.class);
-//        //发送一个测试测点数据
-//        bean.sendFunctionUpdate(-1L,function.getSensorFunctionStatuses());
-
-
-    }
-}