|
|
@@ -0,0 +1,120 @@
|
|
|
+package com.zd.laboratory.event;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.fastjson.TypeReference;
|
|
|
+import com.alibaba.fastjson.parser.Feature;
|
|
|
+import com.zd.laboratory.domain.LabControl;
|
|
|
+import com.zd.laboratory.domain.LabHardware;
|
|
|
+import com.zd.laboratory.domain.vo.LabDealyNotifyVo;
|
|
|
+import com.zd.laboratory.domain.vo.LabHardwareVO;
|
|
|
+import com.zd.laboratory.mapper.LabHardwareMapper;
|
|
|
+import com.zd.laboratory.service.ILabControlService;
|
|
|
+import com.zd.laboratory.socket.command.Symbol;
|
|
|
+import com.zd.laboratory.socket.service.SocketService;
|
|
|
+import com.zd.model.constant.BaseConstants;
|
|
|
+import com.zd.model.constant.HttpStatus;
|
|
|
+import com.zd.model.domain.ResultData;
|
|
|
+import com.zd.model.enums.HardwareTypeEnum;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.redis.connection.Message;
|
|
|
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
|
|
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Optional;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * 当前由于没有消息中间件,所以只能暂时采用此种方式直接消费数据
|
|
|
+ * 而且由于服务端空间问题,高哥没让建新的项目
|
|
|
+ * 所以现在存在的问题:
|
|
|
+ * 1,发布订阅模式无法解决消息竞争的问题,当前通过服务去监听redis key 变动,实际是基于发布订阅模式,导致微服务存在状态,
|
|
|
+ * 当实例启动越多,会对消息处理存在重复处理的情况,应用的水平扩展存在问题,目前该类中消息处理的内容还是幂等的,问题不是很大
|
|
|
+ * 解决方法
|
|
|
+ * 1,将监听还有socket 等独立出来作为单体应用,剔除对服务状态有影响的部分,(缺点,该部分变成了单节点) 通过消息中间件 异步发送消息,
|
|
|
+ * 采用生产者消费者模式消费数据
|
|
|
+ *
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * @Author: cyl
|
|
|
+ * @Date: 2023/3/15/11:19
|
|
|
+ * @Description:
|
|
|
+ */
|
|
|
+@Component
|
|
|
+public class RedisExpiredAndWorkListener extends KeyExpirationEventMessageListener {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private LabHardwareMapper labHardwareMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ILabControlService labControlService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SocketService socketService;
|
|
|
+
|
|
|
+ public RedisExpiredAndWorkListener(RedisMessageListenerContainer listenerContainer)
|
|
|
+ {
|
|
|
+ super(listenerContainer);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(Message message, byte[] bytes) {
|
|
|
+// String topic = new String(bytes);
|
|
|
+ String expiredKey = new String(message.getBody());
|
|
|
+ if(expiredKey.indexOf(BaseConstants.DELAY_QUEUE)!=-1){
|
|
|
+ String[] delayStr = expiredKey.split(BaseConstants.DELAY_QUEUE+"~");
|
|
|
+ JSONObject jsonObj = (JSONObject) JSONObject.parseObject(delayStr[1], Object.class, Feature.OrderedField);
|
|
|
+ LabDealyNotifyVo dealyNotifyVo = jsonObj.getObject(BaseConstants.DELAY_QUEUE, new TypeReference <LabDealyNotifyVo>(){});
|
|
|
+
|
|
|
+ //这里类型固定写死2,智能通风
|
|
|
+ List <Long> ids = new ArrayList <>();
|
|
|
+ ids.add(2L);
|
|
|
+ LabHardwareVO labHardwareVO = new LabHardwareVO();
|
|
|
+ labHardwareVO.setIds(ids);
|
|
|
+ labHardwareVO.setSubjectId(dealyNotifyVo.getSubId());
|
|
|
+ List<LabHardware> hardwareList = labHardwareMapper.selectNewLabHardwareByTypes(labHardwareVO);
|
|
|
+ Optional.ofNullable(hardwareList).orElseGet(Collections::emptyList)
|
|
|
+ .stream()
|
|
|
+ .forEach(a->{
|
|
|
+ if(dealyNotifyVo.getOpenOrCloseType().intValue()==1){
|
|
|
+ ResultData ResultData = socketService.sendMqttCommand(a.getId(), a.getRelayCode(), Symbol.command.open, a.getBit(),a.getSubjectId());
|
|
|
+ boolean equals = ResultData.getCode().equals(HttpStatus.SUCCESS);
|
|
|
+ //插入control 打开日志
|
|
|
+ if (equals) {
|
|
|
+ saveControl(a, 1);
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ ResultData ResultData = socketService.sendMqttCommand(a.getId(), a.getRelayCode(), Symbol.command.close, a.getBit(),a.getSubjectId());
|
|
|
+ boolean equals = ResultData.getCode().equals(HttpStatus.SUCCESS);
|
|
|
+ //插入control 关闭日志
|
|
|
+ if (equals) {
|
|
|
+ saveControl(a,0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void saveControl(LabHardware labHardware,Integer controlType){
|
|
|
+ LabControl control = new LabControl();
|
|
|
+ control.setTriggerModes(1);
|
|
|
+ control.setControlType(controlType);
|
|
|
+ control.setUserName("定时排风自动执行");
|
|
|
+ if(controlType.intValue()==1){
|
|
|
+ control.setOperation("定时排风:打开");
|
|
|
+ }else{
|
|
|
+ control.setOperation("定时排风:关闭");
|
|
|
+ }
|
|
|
+ control.setHardwareType(HardwareTypeEnum.AI_VENTILATION);
|
|
|
+ control.setHardwareId(labHardware.getId());
|
|
|
+ control.setSubjectId(labHardware.getSubjectId());
|
|
|
+ labControlService.insertLabControl(control);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|