Procházet zdrojové kódy

异步处理预案,喇叭处理循环播放,喇叭处理查询条件

hanzhiwei před 2 roky
rodič
revize
018cf470d4

+ 7 - 0
zd-api/zd-algorithm-api/src/main/java/com/zd/algorithm/api/speaker/feign/RemoteSpeakService.java

@@ -56,6 +56,13 @@ public interface RemoteSpeakService {
       R textParseUrlAppIps(@RequestParam("text") String text, @RequestBody List<PlayVo> playVo);
 
       /***
+       * 语音播放
+       * @return
+       */
+      @RequestMapping("/speaker/textMoreParseUrlAppIps")
+      R textMoreParseUrlAppIps(@RequestParam("text") String text, @RequestBody List<PlayVo> playVo,@RequestParam("playNum")Integer playNum);
+
+      /***
        * 关闭喇叭
        * @return
        */

+ 6 - 0
zd-api/zd-algorithm-api/src/main/java/com/zd/algorithm/api/speaker/feign/fallback/RemoteSpeakFallbackFactory.java

@@ -52,6 +52,12 @@ public class RemoteSpeakFallbackFactory implements FallbackFactory<RemoteSpeakSe
             }
 
             @Override
+            public R textMoreParseUrlAppIps(String text, List<PlayVo> playVo, Integer playNum) {
+                log.info("语音播放失败:" + cause.getMessage());
+                return R.fail("textParseUrlIps:语音播放失败"+ cause.getMessage());
+            }
+
+            @Override
             public R stopPlayMusic(String text, String ip) {
                 log.info("关闭喇叭失败:" + cause.getMessage());
                 return R.fail("textParseUrlIps:关闭喇叭失败"+ cause.getMessage());

+ 10 - 5
zd-modules/zd-algorithm/src/main/java/com/zd/alg/forward/serivce/SendSginAccessLogService.java

@@ -9,6 +9,7 @@ import com.zd.algorithm.api.speaker.entity.PlayVo;
 import com.zd.algorithm.api.speaker.feign.RemoteSpeakService;
 import com.zd.common.core.exception.PreAuthorizeException;
 import com.zd.common.core.utils.StringUtils;
+import com.zd.laboratory.api.feign.RemoteLaboratoryService;
 import com.zd.model.constant.CacheDevice;
 import com.zd.model.constant.HttpStatus;
 import com.zd.model.constant.SecurityConstants;
@@ -26,10 +27,7 @@ import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -47,6 +45,8 @@ public class SendSginAccessLogService {
     private final AlgorithmYml algorithmYml;
 
     private final RemoteSpeakService remoteSpeakService;
+    @Autowired
+    private RemoteLaboratoryService remoteLaboratoryService;
 
     /**
      * 报警次数记录
@@ -148,7 +148,12 @@ public class SendSginAccessLogService {
         }
         String loudspeakerIp1 = algorithmYml.getLoudspeakerIp1();
         String loudspeakerIp2 = algorithmYml.getLoudspeakerIp2();
-        R deviceList = remoteSpeakService.getDeviceList(1, 100, 5L);
+        R<Object> objectR = remoteLaboratoryService.selectSpeakerCount();
+        Integer count = 0;
+        if (!Objects.isNull(objectR) && objectR.getCode() == (HttpStatus.SUCCESS)){
+            count = Integer.parseInt(String.valueOf(objectR.getData()));
+        }
+        R deviceList = remoteSpeakService.getDeviceList(1, count + 10, 5L);
         if (deviceList.getCode() == HttpStatus.SUCCESS) {
             List<Map<String, Object>> mapList = (List<Map<String, Object>>) deviceList.getData();
             for (Map<String, Object> map : mapList) {

+ 2 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/ZdLaboratoryApplication.java

@@ -10,6 +10,7 @@ import com.zd.model.constant.BaseConstants;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 /**
@@ -23,6 +24,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @EnableScheduling
 @ComponentScan(basePackages = BaseConstants.BASE_PACKAGE)
 @MapperScan(value = {"com.zd.**.mapper"})
+@EnableAsync
 public class ZdLaboratoryApplication {
     public static void main(String[] args) {
         HaiKangDoorService.skdInit = AcsBase.sdk_init();

+ 43 - 43
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/config/ExecutorConfig.java

@@ -1,43 +1,43 @@
-package com.zd.laboratory.config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * @Author: zhoupan
- * @Date: 2021/11/17/14:39
- * @Description:
- */
-@Configuration
-public class ExecutorConfig {
-
-
-    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
-
-    @Bean(name = "labExecutor")
-    public Executor asyncServiceExecutor() {
-        logger.info("start asyncServiceExecutor");
-        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        //配置核心线程数
-        executor.setCorePoolSize(5);
-        //配置最大线程数
-        executor.setMaxPoolSize(8);
-        //配置队列大小
-        executor.setQueueCapacity(99999);
-        //配置线程池中的线程的名称前缀
-        executor.setThreadNamePrefix("async-service-");
-
-        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
-        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-        //执行初始化
-        executor.initialize();
-        return executor;
-    }
-}
+//package com.zd.laboratory.config;
+//
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+//
+//import java.util.concurrent.Executor;
+//import java.util.concurrent.ThreadPoolExecutor;
+//
+///**
+// * @Author: zhoupan
+// * @Date: 2021/11/17/14:39
+// * @Description:
+// */
+//@Configuration
+//public class ExecutorConfig {
+//
+//
+//    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
+//
+//    @Bean(name = "labExecutor")
+//    public Executor asyncServiceExecutor() {
+//        logger.info("start asyncServiceExecutor");
+//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+//        //配置核心线程数
+//        executor.setCorePoolSize(5);
+//        //配置最大线程数
+//        executor.setMaxPoolSize(8);
+//        //配置队列大小
+//        executor.setQueueCapacity(99999);
+//        //配置线程池中的线程的名称前缀
+//        executor.setThreadNamePrefix("async-service-");
+//
+//        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
+//        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
+//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+//        //执行初始化
+//        executor.initialize();
+//        return executor;
+//    }
+//}

+ 48 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/config/SyncConfiguration.java

@@ -0,0 +1,48 @@
+package com.zd.laboratory.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * @Description 异步线程池配置
+ * @Author hzw
+ * @Date 2023/1/31 14:38
+ * @Version 2.0
+ */
+@Slf4j
+@Configuration
+@EnableAsync
+public class SyncConfiguration {
+    @Bean(name = "asyncPoolTaskExecutor")
+    public ThreadPoolTaskExecutor executor() {
+        log.info("lab服务异步线程池配置!");
+        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+        //核心线程数
+        taskExecutor.setCorePoolSize(10);
+        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
+        taskExecutor.setMaxPoolSize(100);
+        //缓存队列
+        taskExecutor.setQueueCapacity(50);
+        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
+        taskExecutor.setKeepAliveSeconds(200);
+        //异步方法内部线程名称
+        taskExecutor.setThreadNamePrefix("async-labService-");
+        /**
+         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
+         * 通常有以下四种策略:
+         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
+         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
+         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
+         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
+         */
+        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        taskExecutor.initialize();
+        return taskExecutor;
+    }
+}

+ 1 - 1
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/service/impl/LabMessageContentServiceImpl.java

@@ -176,7 +176,7 @@ public class LabMessageContentServiceImpl implements ILabMessageContentService {
      * @param subId       实验室ID
      */
     @Override
-    @Async("labExecutor")
+    @Async("asyncPoolTaskExecutor")
     public void sendWranMessage(Long subId,LabRiskPlanLevel labRiskPlanLevel, Long groupId) {
         //获取实验室负责人信息
         LabSubject subject = new LabSubject();

+ 2 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/service/impl/LabRiskPlanLevelServiceImpl.java

@@ -15,6 +15,7 @@ import com.zd.laboratory.service.LabRiskPlanLevelService;
 import com.zd.model.domain.ResultData;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -36,6 +37,7 @@ public class LabRiskPlanLevelServiceImpl extends ServiceImpl<LabRiskPlanLevelMap
     private ILabRiskPlanHardwareRelationService labRiskPlanHardwareRelationService;
 
     @Autowired
+    @Lazy
     private ILabRiskPlanService labRiskPlanService;
 
 

+ 21 - 22
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/service/impl/LabRiskPlanServiceImpl.java

@@ -47,6 +47,7 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -114,6 +115,7 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
     private LabAudioSynthesisMapper labAudioSynthesisMapper;
 
     @Autowired
+    @Lazy
     private LabRiskPlanLevelService labRiskPlanLevelService;
 
     @Autowired
@@ -143,6 +145,9 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
     @Autowired
     private ILabBuildFloorLayoutService labBuildFloorLayoutService;
 
+    @Autowired
+    private ILabSparseHardwareService labSparseHardwareService;
+
     private static final Logger log = LoggerFactory.getLogger(LabRiskPlanServiceImpl.class);
 
 
@@ -804,7 +809,9 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
             log.info("关闭喇叭-查询的音乐列表信息 audioSyntheses= {}", JSON.toJSONString(audioSyntheses));
             if (audioSyntheses != null && audioSyntheses.size() > 0) {
                 log.info("关闭喇叭-远程调用查询喇叭列表,楼层id={},实验室id={}", floorId, subjectid);
-                R deviceList = remoteSpeakService.getDeviceList(1, 100, floorId, subjectid);
+                Integer count = labSparseHardwareService.selectSpeakerCount();
+                log.info("查询喇叭总数:{}",count);
+                R deviceList = remoteSpeakService.getDeviceList(1, count + 10, floorId, subjectid);
                 log.info("关闭喇叭-远程调用喇叭列表返回内容: deviceList={}", JSON.toJSONString(deviceList));
                 if (deviceList.getCode() == 200) {
                     List<Map<String, Object>> mapList = (List<Map<String, Object>>) deviceList.getData();
@@ -1000,7 +1007,9 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
             if (audioSyntheses != null && audioSyntheses.size() > 0) {
                 //预案调用喇叭
                 log.info("打开喇叭-远程调用查询喇叭列表,楼层id={},实验室id={}", floorId, subjectId);
-                R deviceList = remoteSpeakService.getDeviceList(1, 100, floorId, subjectId);
+                Integer count = labSparseHardwareService.selectSpeakerCount();
+                log.info("查询喇叭总数:{}",count);
+                R deviceList = remoteSpeakService.getDeviceList(1, count + 10, floorId, subjectId);
                 log.info("打开喇叭-远程调用喇叭列表返回内容: deviceList={}", JSON.toJSONString(deviceList));
                 if (deviceList.getCode() == 200) {
                     List<Map<String, Object>> mapList = (List<Map<String, Object>>) deviceList.getData();
@@ -1016,7 +1025,7 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
                             playVo.setParams(paramVo);
                             playVoList.add(playVo);
                             log.info("打开喇叭-远程调用喇叭播放音乐!url={},playVoList={}", audioSyntheses.get(0).getNewMusicUrl(), JSON.toJSONString(playVoList));
-                            R r = remoteSpeakService.textParseUrlAppIps(audioSyntheses.get(0).getNewMusicUrl(), playVoList);
+                            R r = remoteSpeakService.textMoreParseUrlAppIps(audioSyntheses.get(0).getNewMusicUrl(), playVoList,1000);
                             log.info("打开喇叭-远程调用喇叭播放音乐返回信息:{}", JSON.toJSONString(r));
                         } else {
                             log.info("打开喇叭-喇叭deviceSn/port为空!");
@@ -1042,7 +1051,7 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
     private void startVideo(Long subjectId) {
         try {
             LabHardware labHardware = labHardwareService.selectLabHardwareCameraBySub(subjectId);
-            if (labHardware != null || labHardware.getIpAddress() != null) {
+            if (labHardware != null && labHardware.getIpAddress() != null) {
                 //开始录制视频
                 log.info("远程调用开始录制视频ip地址={}", labHardware.getIpAddress());
                 remoteCameraService.stopRecord(labHardware.getIpAddress());
@@ -1057,7 +1066,7 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
                 log.info("未查询到实验室关联摄像头信息!请检查ip是否为空!");
             }
         } catch (Exception e) {
-            log.error("远程调用开始录制视频接口失败!{}", e);
+            log.error("远程调用开始录制视频接口异常!{}", e);
         }
     }
 
@@ -1365,15 +1374,8 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
                     labHardware.setSubjectId(subjectId);
                     labHardware.setType(HardwareTypeEnum.getByCode(riskPlanHardwareList.get(i).getHardwareType()));
                     List<LabHardware> hardwareList = labHardwareMapper.selectLabHardwareListBySubject(labHardware);
-                    //循环执行实验室下的硬件功能
-                    for (LabHardware hard : hardwareList) {
-                        ResultData result = labSubjectManagerService.excutingComm(hard, status);
-                        try {
-                            Thread.sleep(300);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
+                    //异步循环执行实验室下的硬件功能
+                    labSubjectManagerService.operationHardware(status, hardwareList);
                 }
             }
             log.error("打开硬件记录风险日志");
@@ -1401,15 +1403,12 @@ public class LabRiskPlanServiceImpl extends ServiceImpl<LabRiskPlanMapper, LabRi
                     labHardware.setSubjectId(subjectId);
                     labHardware.setType(HardwareTypeEnum.getByCode(riskPlanHardwareList.get(i).getHardwareType()));
                     List<LabHardware> hardwareList = labHardwareMapper.selectLabHardwareListBySubject(labHardware);
-                    //循环执行实验室下的硬件功能
-                    for (LabHardware hard : hardwareList) {
-                        labSubjectManagerService.excutingComm(hard, status);
-                        try {
-                            Thread.sleep(300);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
+                    if (hardwareList.isEmpty()) {
+                        log.info("查询实验室硬件信息为空,跳过此实验室!subjectId={}",subjectId);
+                        continue;
                     }
+                    //异步循环执行实验室下的硬件功能
+                    labSubjectManagerService.operationHardware(status,hardwareList);
                 }
             }
         }

+ 14 - 1
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/service/impl/LabSubjectManagerService.java

@@ -122,6 +122,19 @@ public class LabSubjectManagerService {
 
     }
 
+    @Async("asyncPoolTaskExecutor")
+    public void operationHardware(FunctionStatus status, List<LabHardware> hardwareList) {
+        try {
+            logger.info("异步线程处理硬件信息:id={},name={}",Thread.currentThread().getId(),Thread.currentThread().getName());
+            for (LabHardware hard : hardwareList) {
+                ResultData result = excutingComm(hard, status);
+                Thread.sleep(500);
+            }
+        } catch (InterruptedException e) {
+            logger.error("异步处理硬件异常!",e);
+        }
+    }
+
     /**
      * 设备-执行命令
      *
@@ -164,7 +177,7 @@ public class LabSubjectManagerService {
             return ResultData.fail("继电器指令错误!!");
         }
 
-        logger.info("预案调用硬件,硬件id={},硬件指令={}",labHardware.getId(), JSON.toJSONString(command));
+        logger.info("预案调用硬件,硬件id={},硬件名称={},硬件指令={}",labHardware.getId(), JSON.toJSONString(labHardware.getName()),JSON.toJSONString(command));
         return ResultData.success(control(labHardware.getId(), command));
 
         //老继电器调用方法