Procházet zdrojové kódy

修改mqtt ip、端口
mapper映射修改
web依赖添加

hanzhiwei před 3 roky
rodič
revize
c46ba50ace

+ 236 - 236
zd-modules/zd-algorithm/src/main/java/com/zd/alg/alarm/mqtt/MqttConfig.java

@@ -1,236 +1,236 @@
-package com.zd.alg.alarm.mqtt;
-
-import com.alibaba.druid.support.json.JSONUtils;
-import com.alibaba.fastjson.JSON;
-import com.zd.alg.alarm.domain.AlarmLog;
-import com.zd.alg.alarm.service.IAlarmLogService;
-import com.zd.alg.alarm.utils.AlarmUtil;
-import com.zd.common.core.utils.DateUtils;
-import com.zd.system.api.alarm.domain.AlarmEntrty;
-import com.zd.system.api.alarm.domain.Routes;
-import com.zd.system.api.alarm.domain.SendTypes;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
-import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
-import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
-import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
-import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.MessagingException;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * mqtt配置
- */
-@Configuration
-public class MqttConfig {
-
-    @Autowired
-    private AlarmUtil alarmUtil;
-    @Autowired
-    private IAlarmLogService alarmLogService;
-
-    static Logger logger = LoggerFactory.getLogger(MqttConfig.class);
-
-    private static final byte[] WILL_DATA;
-
-    static {
-        WILL_DATA = "offline".getBytes();
-    }
-
-    public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";
-
-
-    public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";
-
-    @Value("${mqtt.username:}")
-    private String username;
-
-    @Value("${mqtt.password:}")
-    private String password;
-
-    @Value("${mqtt.url}")
-    private String url;
-
-    @Value("${mqtt.defaultTopic}")
-    private String defaultTopic;
-
-    @Value("${mqtt.producer.clientId}")
-    private String producerClientId;
-
-    @Value("${mqtt.consumer.clientId}")
-    private String consumerClientId;
-
-    @Value("${mqtt.consumer.maxInflight}")
-    private Integer maxInflight;
-
-    private MqttPahoMessageDrivenChannelAdapter adapter;
-
-    /**
-     * 连接mqtt配置
-     */
-    @Bean
-    public MqttConnectOptions mqttConnectOptions() {
-        MqttConnectOptions options = new MqttConnectOptions();
-        // false,服务器会保留客户端的连接记录
-        // true,表示每次连接到服务器都以新的身份连接
-        options.setCleanSession(false);
-        options.setUserName(username);
-        options.setPassword(password.toCharArray());
-//        options.setMaxInflight(maxInflight);
-        options.setServerURIs(StringUtils.split(url, ","));
-        //超时时间 单位为秒
-        options.setConnectionTimeout(10);
-        //会话心跳时间 单位: s, 间隔时间:1.5*20秒向客户端发送心跳判断客户端是否在线
-        options.setKeepAliveInterval(60);
-        //设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
-//        options.setWill("willTopic", WILL_DATA, 2, false);
-        return options;
-    }
-
-
-    /**
-     * MQTT客户端
-     */
-    @Bean
-    public MqttPahoClientFactory mqttClientFactory() {
-        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-        factory.setConnectionOptions(mqttConnectOptions());
-        return factory;
-    }
-
-    /**
-     * 发送者消息通道
-     */
-    @Bean(name = MQTT_OUTBOUND_CHANNEL)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * 发送者消息处理
-     */
-    @Bean
-    @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
-    public MessageHandler mqttOutbound() {
-        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
-                producerClientId,
-                mqttClientFactory());
-        messageHandler.setAsync(true);
-        messageHandler.setDefaultTopic(defaultTopic);
-        return messageHandler;
-    }
-
-    /**
-     * 消息订阅
-     */
-    @Bean
-    public MessageProducer inbound() {
-        // 可同时消费(订阅)多个Topic
-        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
-        adapter.setCompletionTimeout(5000);
-        adapter.setConverter(new DefaultPahoMessageConverter());
-        adapter.setQos(2);
-        // 设置订阅通道
-        adapter.setOutputChannel(mqttInboundChannel());
-        return adapter;
-    }
-
-
-    /**
-     * 消费者消息通道
-     */
-    @Bean(name = MQTT_INBOUND_CHANNEL)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * 消费者消息处理
-     *设备心跳
-     */
-    @Bean
-    @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
-    public MessageHandler mqttInbound() {
-
-        return new MessageHandler() {
-            @Override
-            public void handleMessage(Message<?> message) throws MessagingException {
-                String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
-
-                logger.info("MQTT 接收 topic:  {}",receivedTopic);
-
-                String msg = (String) message.getPayload();
-                logger.info("MQTT 接收 消息体:  {}",msg);
-
-                try {
-                    String type = SendTypes.All+"";
-                    Map<String,Object> map = JSON.parseObject(msg, Map.class);
-
-                    map = JSON.parseObject(map.get("data")+"", Map.class);
-
-                    String text = (String) map.get("text");
-                    List<String> list = JSON.parseArray(map.get("to")+"", String.class);
-                    if(CollectionUtils.isEmpty(list)){
-                        logger.error("接收告警消息手机号为空!");
-                        return;
-                    }
-
-                    Set<String> set = new HashSet<>(list);
-                    String[] to = new String[set.size()];
-                    set.toArray(to);
-
-                    String dataType = (String) map.get("type");
-                    if(map.get("type") != null && ((SendTypes.All+"").equals(dataType)
-                            || (SendTypes.Call+"").equals(dataType) || (SendTypes.SMS+"").equals(dataType))){
-                        type = dataType;
-                    }
-
-                    AlarmEntrty alarmEntrty = new AlarmEntrty(Routes.NoticePush, to, type, text);
-
-                    map = alarmUtil.sendPost(alarmEntrty);
-
-                    String data = (String) map.get("Reply");
-                    if("OK".equals(data)){
-                        String[] phones = alarmEntrty.getTo();
-                        for (String phone : phones) {
-                            AlarmLog alarmLog = new AlarmLog();
-                            alarmLog.setRemark("预案ID/实验室ID: " + receivedTopic.replace(defaultTopic, ""));
-                            alarmLog.setIsBack(0);
-                            alarmLog.setStatus("失败");
-                            alarmLog.setPhone(phone);
-                            alarmLog.setNotice(alarmEntrty.getText());
-                            alarmLog.setCreateTime(DateUtils.getNowDate());
-
-                            alarmLogService.insertAlarmLog(alarmLog);
-                        }
-                    }else {
-                        logger.error("报警电话推送错误:" + JSONUtils.toJSONString(map));
-                    }
-                }catch (Exception e){
-                    e.printStackTrace();
-                    logger.error("报警处理数据异常:" + e.getMessage());
-                }
-
-            }
-        };
-    }
-}
+//package com.zd.alg.alarm.mqtt;
+//
+//import com.alibaba.druid.support.json.JSONUtils;
+//import com.alibaba.fastjson.JSON;
+//import com.zd.alg.alarm.domain.AlarmLog;
+//import com.zd.alg.alarm.service.IAlarmLogService;
+//import com.zd.alg.alarm.utils.AlarmUtil;
+//import com.zd.common.core.utils.DateUtils;
+//import com.zd.system.api.alarm.domain.AlarmEntrty;
+//import com.zd.system.api.alarm.domain.Routes;
+//import com.zd.system.api.alarm.domain.SendTypes;
+//import org.apache.commons.collections4.CollectionUtils;
+//import org.apache.commons.lang.StringUtils;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.integration.annotation.ServiceActivator;
+//import org.springframework.integration.channel.DirectChannel;
+//import org.springframework.integration.core.MessageProducer;
+//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+//import org.springframework.integration.mqtt.support.MqttHeaders;
+//import org.springframework.messaging.Message;
+//import org.springframework.messaging.MessageChannel;
+//import org.springframework.messaging.MessageHandler;
+//import org.springframework.messaging.MessagingException;
+//
+//import java.util.HashSet;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Set;
+//
+///**
+// * mqtt配置
+// */
+//@Configuration
+//public class MqttConfig {
+//
+//    @Autowired
+//    private AlarmUtil alarmUtil;
+//    @Autowired
+//    private IAlarmLogService alarmLogService;
+//
+//    static Logger logger = LoggerFactory.getLogger(MqttConfig.class);
+//
+//    private static final byte[] WILL_DATA;
+//
+//    static {
+//        WILL_DATA = "offline".getBytes();
+//    }
+//
+//    public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";
+//
+//
+//    public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";
+//
+//    @Value("${mqtt.username:}")
+//    private String username;
+//
+//    @Value("${mqtt.password:}")
+//    private String password;
+//
+//    @Value("${mqtt.url}")
+//    private String url;
+//
+//    @Value("${mqtt.defaultTopic}")
+//    private String defaultTopic;
+//
+//    @Value("${mqtt.producer.clientId}")
+//    private String producerClientId;
+//
+//    @Value("${mqtt.consumer.clientId}")
+//    private String consumerClientId;
+//
+//    @Value("${mqtt.consumer.maxInflight}")
+//    private Integer maxInflight;
+//
+//    private MqttPahoMessageDrivenChannelAdapter adapter;
+//
+//    /**
+//     * 连接mqtt配置
+//     */
+//    @Bean
+//    public MqttConnectOptions mqttConnectOptions() {
+//        MqttConnectOptions options = new MqttConnectOptions();
+//        // false,服务器会保留客户端的连接记录
+//        // true,表示每次连接到服务器都以新的身份连接
+//        options.setCleanSession(false);
+//        options.setUserName(username);
+//        options.setPassword(password.toCharArray());
+////        options.setMaxInflight(maxInflight);
+//        options.setServerURIs(StringUtils.split(url, ","));
+//        //超时时间 单位为秒
+//        options.setConnectionTimeout(10);
+//        //会话心跳时间 单位: s, 间隔时间:1.5*20秒向客户端发送心跳判断客户端是否在线
+//        options.setKeepAliveInterval(60);
+//        //设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
+////        options.setWill("willTopic", WILL_DATA, 2, false);
+//        return options;
+//    }
+//
+//
+//    /**
+//     * MQTT客户端
+//     */
+//    @Bean
+//    public MqttPahoClientFactory mqttClientFactory() {
+//        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+//        factory.setConnectionOptions(mqttConnectOptions());
+//        return factory;
+//    }
+//
+//    /**
+//     * 发送者消息通道
+//     */
+//    @Bean(name = MQTT_OUTBOUND_CHANNEL)
+//    public MessageChannel mqttOutboundChannel() {
+//        return new DirectChannel();
+//    }
+//
+//    /**
+//     * 发送者消息处理
+//     */
+//    @Bean
+//    @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
+//    public MessageHandler mqttOutbound() {
+//        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
+//                producerClientId,
+//                mqttClientFactory());
+//        messageHandler.setAsync(true);
+//        messageHandler.setDefaultTopic(defaultTopic);
+//        return messageHandler;
+//    }
+//
+//    /**
+//     * 消息订阅
+//     */
+//    @Bean
+//    public MessageProducer inbound() {
+//        // 可同时消费(订阅)多个Topic
+//        adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId, mqttClientFactory(), StringUtils.split(defaultTopic, ","));
+//        adapter.setCompletionTimeout(5000);
+//        adapter.setConverter(new DefaultPahoMessageConverter());
+//        adapter.setQos(2);
+//        // 设置订阅通道
+//        adapter.setOutputChannel(mqttInboundChannel());
+//        return adapter;
+//    }
+//
+//
+//    /**
+//     * 消费者消息通道
+//     */
+//    @Bean(name = MQTT_INBOUND_CHANNEL)
+//    public MessageChannel mqttInboundChannel() {
+//        return new DirectChannel();
+//    }
+//
+//    /**
+//     * 消费者消息处理
+//     *设备心跳
+//     */
+//    @Bean
+//    @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
+//    public MessageHandler mqttInbound() {
+//
+//        return new MessageHandler() {
+//            @Override
+//            public void handleMessage(Message<?> message) throws MessagingException {
+//                String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
+//
+//                logger.info("MQTT 接收 topic:  {}",receivedTopic);
+//
+//                String msg = (String) message.getPayload();
+//                logger.info("MQTT 接收 消息体:  {}",msg);
+//
+//                try {
+//                    String type = SendTypes.All+"";
+//                    Map<String,Object> map = JSON.parseObject(msg, Map.class);
+//
+//                    map = JSON.parseObject(map.get("data")+"", Map.class);
+//
+//                    String text = (String) map.get("text");
+//                    List<String> list = JSON.parseArray(map.get("to")+"", String.class);
+//                    if(CollectionUtils.isEmpty(list)){
+//                        logger.error("接收告警消息手机号为空!");
+//                        return;
+//                    }
+//
+//                    Set<String> set = new HashSet<>(list);
+//                    String[] to = new String[set.size()];
+//                    set.toArray(to);
+//
+//                    String dataType = (String) map.get("type");
+//                    if(map.get("type") != null && ((SendTypes.All+"").equals(dataType)
+//                            || (SendTypes.Call+"").equals(dataType) || (SendTypes.SMS+"").equals(dataType))){
+//                        type = dataType;
+//                    }
+//
+//                    AlarmEntrty alarmEntrty = new AlarmEntrty(Routes.NoticePush, to, type, text);
+//
+//                    map = alarmUtil.sendPost(alarmEntrty);
+//
+//                    String data = (String) map.get("Reply");
+//                    if("OK".equals(data)){
+//                        String[] phones = alarmEntrty.getTo();
+//                        for (String phone : phones) {
+//                            AlarmLog alarmLog = new AlarmLog();
+//                            alarmLog.setRemark("预案ID/实验室ID: " + receivedTopic.replace(defaultTopic, ""));
+//                            alarmLog.setIsBack(0);
+//                            alarmLog.setStatus("失败");
+//                            alarmLog.setPhone(phone);
+//                            alarmLog.setNotice(alarmEntrty.getText());
+//                            alarmLog.setCreateTime(DateUtils.getNowDate());
+//
+//                            alarmLogService.insertAlarmLog(alarmLog);
+//                        }
+//                    }else {
+//                        logger.error("报警电话推送错误:" + JSONUtils.toJSONString(map));
+//                    }
+//                }catch (Exception e){
+//                    e.printStackTrace();
+//                    logger.error("报警处理数据异常:" + e.getMessage());
+//                }
+//
+//            }
+//        };
+//    }
+//}

+ 4 - 5
zd-modules/zd-algorithm/src/main/java/com/zd/alg/alarm/utils/AlarmUtil.java

@@ -14,7 +14,6 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -32,11 +31,11 @@ import java.util.*;
 public class AlarmUtil {
     private static final Logger logger = LoggerFactory.getLogger(AlarmUtil.class);
 
-    @Value("${alarm.host}")
-    private String alarmUrl ;
+//    @Value("${alarm.host}")
+    private String alarmUrl = "192.168.1.100" ;
 
-    @Value("${alarm.retry}")
-    private Integer retryCount ;
+//    @Value("${alarm.retry}")
+    private Integer retryCount = 3 ;
 
     @Autowired
     private IAlarmLogService alarmLogService;

+ 5 - 0
zd-modules/zd-base/pom.xml

@@ -14,6 +14,11 @@
     <description>基础服务</description>
 
     <dependencies>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
         <!-- SpringCloud Alibaba Nacos -->
         <dependency>
             <groupId>com.alibaba.cloud</groupId>

+ 6 - 6
zd-modules/zd-base/src/main/resources/mapper/message/UserOpenIdMapper.xml

@@ -2,9 +2,9 @@
 <!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="com.zd.message.mapper.UserOpenIdMapper">
+<mapper namespace="com.zd.base.message.mapper.UserOpenIdMapper">
     
-    <resultMap type="com.zd.message.domain.UserOpenId" id="UserOpenIdResult">
+    <resultMap type="com.zd.base.message.domain.UserOpenId" id="UserOpenIdResult">
         <result property="id"    column="id"    />
         <result property="userId"    column="user_id"    />
         <result property="openId"    column="open_id"    />
@@ -16,7 +16,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
     <sql id="selectUserOpenIdListVo">
         select t.id, t.user_id, t.open_id from qp_user_open_id as t
     </sql>
-    <select id="selectUserOpenIdList" parameterType="com.zd.message.domain.UserOpenId" resultMap="UserOpenIdResult">
+    <select id="selectUserOpenIdList" parameterType="com.zd.base.message.domain.UserOpenId" resultMap="UserOpenIdResult">
         <include refid="selectUserOpenIdVo"/>
         <where>  
             <if test="openId != null  and openId != ''"> and open_id = #{openId}</if>
@@ -37,12 +37,12 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         <include refid="selectUserOpenIdVo"/>
         where id = #{id}
     </select>
-    <select id="getByUserId" resultType="com.zd.message.domain.UserOpenId" parameterType="java.lang.Long">
+    <select id="getByUserId" resultType="com.zd.base.message.domain.UserOpenId" parameterType="java.lang.Long">
         <include refid="selectUserOpenIdVo"/>
         where user_id = #{userId}
     </select>
 
-    <insert id="insertUserOpenId" parameterType="com.zd.message.domain.UserOpenId" useGeneratedKeys="true" keyProperty="id">
+    <insert id="insertUserOpenId" parameterType="com.zd.base.message.domain.UserOpenId" useGeneratedKeys="true" keyProperty="id">
         insert into qp_user_open_id
         <trim prefix="(" suffix=")" suffixOverrides=",">
     <if test="userId != null">user_id,</if>
@@ -56,7 +56,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
          </trim>
     </insert>
 
-    <update id="updateUserOpenId" parameterType="com.zd.message.domain.UserOpenId">
+    <update id="updateUserOpenId" parameterType="com.zd.base.message.domain.UserOpenId">
         update qp_user_open_id
         <trim prefix="SET" suffixOverrides=",">
             <if test="userId != null">user_id = #{userId},</if>