Przeglądaj źródła

2023-3-15 测试定时排风消息消费。

chaiyunlong 2 lat temu
rodzic
commit
6b0fe59d1b

+ 3 - 0
zd-model/src/main/java/com/zd/model/constant/BaseConstants.java

@@ -183,4 +183,7 @@ public interface BaseConstants {
      * 手机登录验证码有效期(分钟)
      */
     long CODE_EXPIRATION = 5;
+
+
+    String DELAY_QUEUE = "delayQueue";
 }

+ 30 - 4
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/controller/LabAbnormalController.java

@@ -1,16 +1,21 @@
 package com.zd.laboratory.controller;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.zd.common.core.annotation.Log;
 import com.zd.common.core.annotation.PreAuthorize;
 import com.zd.common.core.log.BusinessType;
+import com.zd.common.core.redis.RedisService;
 import com.zd.common.core.utils.DictUtils;
 import com.zd.common.core.utils.ExcelUtil;
 import com.zd.common.core.web.controller.BaseController;
 import com.zd.laboratory.domain.LabAbnormal;
 import com.zd.laboratory.domain.vo.LabAbnormalVO;
+import com.zd.laboratory.domain.vo.LabDealyNotifyVo;
 import com.zd.laboratory.service.ILabAbnormalService;
 import com.zd.laboratory.socket.command.Symbol;
 import com.zd.laboratory.socket.service.SocketService;
+import com.zd.model.constant.BaseConstants;
 import com.zd.model.domain.ResultData;
 import com.zd.model.domain.per.PerFun;
 import com.zd.model.domain.per.PerPrefix;
@@ -23,10 +28,8 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.time.LocalDate;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 异常设备Controller
@@ -42,6 +45,9 @@ public class LabAbnormalController extends BaseController {
     @Autowired
     private SocketService socketService;
 
+    @Autowired
+    private RedisService redisService;
+
     @GetMapping("/send")
     public ResultData send(int num, String deviceNum, int type) {
 
@@ -187,4 +193,24 @@ public class LabAbnormalController extends BaseController {
     public ResultData getAbnormalCount(LabAbnormalVO labAbnormal) {
         return ResultData.success(labAbnormalService.getAbnormalCount(labAbnormal));
     }
+
+    @GetMapping("/sendMsg")
+    public ResultData sendMsg() {
+        LabDealyNotifyVo labDealyNotifyVo = new LabDealyNotifyVo();
+        labDealyNotifyVo.setRandomNum(UUID.randomUUID().toString());
+        labDealyNotifyVo.setHardwareNum("123455");
+        labDealyNotifyVo.setOpenOrCloseType(1);
+        JSONObject jsonObj = new JSONObject();
+        jsonObj.put(BaseConstants.DELAY_QUEUE,labDealyNotifyVo);
+        redisService.setCacheObject(BaseConstants.DELAY_QUEUE+"~"+ jsonObj,jsonObj, 10L, TimeUnit.SECONDS);
+
+        LabDealyNotifyVo labDealyNotifyVo2 = new LabDealyNotifyVo();
+        labDealyNotifyVo2.setRandomNum(UUID.randomUUID().toString());
+        labDealyNotifyVo2.setHardwareNum("123455");
+        labDealyNotifyVo2.setOpenOrCloseType(0);
+        JSONObject jsonObj2 = new JSONObject();
+        jsonObj2.put(BaseConstants.DELAY_QUEUE,labDealyNotifyVo2);
+        redisService.setCacheObject(BaseConstants.DELAY_QUEUE+"~"+ jsonObj2,jsonObj2, 20L, TimeUnit.SECONDS);
+        return null;
+    }
 }

+ 30 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/domain/vo/LabDealyNotifyVo.java

@@ -0,0 +1,30 @@
+package com.zd.laboratory.domain.vo;
+
+import com.zd.laboratory.domain.LabBuildFloor;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+/**
+ * Controller
+ *
+ * @author cyl
+ * @date 2023/3/14
+ */
+@ApiModel("延迟通知消息")
+@Data
+public class LabDealyNotifyVo {
+
+    /** 随机数,保证同样的继电器不重复 */
+    @ApiModelProperty(value = "随机数")
+    private String randomNum;
+
+    /** 继电器编号 */
+    @ApiModelProperty(value = "继电器编号")
+    private String hardwareNum;
+
+    /** 开启关闭状态 */
+    @ApiModelProperty(value = "开启关闭状态")
+    private Integer openOrCloseType;
+
+}

+ 63 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/event/RedisExpiredAndWorkListener.java

@@ -0,0 +1,63 @@
+package com.zd.laboratory.event;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.alibaba.fastjson.parser.Feature;
+import com.zd.laboratory.domain.vo.LabDealyNotifyVo;
+import com.zd.laboratory.domain.vo.LabExitLineJoinPointVO;
+import com.zd.laboratory.event.handle.EventHandler;
+import com.zd.model.constant.BaseConstants;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * 当前由于没有消息中间件,所以只能暂时采用此种方式直接消费数据
+ * 而且由于服务端空间问题,高哥没让建新的项目
+ * 所以现在存在的问题:
+ * 1,发布订阅模式无法解决消息竞争的问题,当前通过服务去监听redis key 变动,实际是基于发布订阅模式,导致微服务存在状态,
+ * 当实例启动越多,会对消息处理存在重复处理的情况,应用的水平扩展存在问题,目前该类中消息处理的内容还是幂等的,问题不是很大
+ * 解决方法
+ * 1,将监听还有socket 等独立出来作为单体应用,剔除对服务状态有影响的部分,(缺点,该部分变成了单节点) 通过消息中间件 异步发送消息,
+ * 采用生产者消费者模式消费数据
+ *
+ *
+ *
+ * @Author: zhoupan
+ * @Date: 2021/10/20/11:19
+ * @Description:
+ */
+@Component
+public class RedisExpiredAndWorkListener extends KeyExpirationEventMessageListener {
+
+    public RedisExpiredAndWorkListener(RedisMessageListenerContainer listenerContainer)
+    {
+        super(listenerContainer);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+//        String topic = new String(bytes);
+        String expiredKey = new String(message.getBody());
+        if(expiredKey.indexOf(BaseConstants.DELAY_QUEUE)!=-1){
+            String[] delayStr = expiredKey.split(BaseConstants.DELAY_QUEUE+"~");
+            JSONObject jsonObj = (JSONObject) JSONObject.parseObject(delayStr[1], Object.class, Feature.OrderedField);
+            LabDealyNotifyVo dealyNotifyVo = jsonObj.getObject(BaseConstants.DELAY_QUEUE, new TypeReference <LabDealyNotifyVo>(){});
+            System.out.println("测试继电器编号:"+dealyNotifyVo.getHardwareNum());
+            System.out.println("测试随机数:"+dealyNotifyVo.getRandomNum());
+            System.out.println("测试开启或关闭::::"+dealyNotifyVo.getOpenOrCloseType());
+        }
+
+    }
+
+}