package xn.xxp.mqtt import android.os.AsyncTask import android.util.Log import com.blankj.utilcode.util.LogUtils import com.blankj.utilcode.util.ThreadUtils import com.blankj.utilcode.util.ToastUtils import com.google.gson.Gson import com.google.gson.reflect.TypeToken import http.client.ApiRepository import http.client.HttpTool import http.vo.response.LabBulletinBoardVo import io.reactivex.rxjava3.disposables.CompositeDisposable import org.eclipse.paho.client.mqttv3.MqttMessage import org.greenrobot.eventbus.EventBus import org.json.JSONObject import xn.xxp.home.auth.AuthType import xn.xxp.mqtt.entity.BulletinBoardEntity import xn.xxp.mqtt.event.BulletinBoardEvent import xn.xxp.mqtt.event.LabInfoEvent import xn.xxp.mqtt.event.OnlineUserEvent import xn.xxp.mqtt.event.ThingsEvent import xn.xxp.mqtt.event.UpdateUiEvent import xn.xxp.mqtt.event.WarningEvent import xn.xxp.room.RoomTool import xn.xxp.room.bean.DeviceConfig import xn.xxp.room.bean.LabConfig import xn.xxp.utils.Constants import xn.xxp.utils.Tool /** * info * * @author ReiChin_ */ class MqttManager private constructor() { companion object { val instance: MqttManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { MqttManager() } } private val mGson = Gson() private lateinit var deviceConfig: DeviceConfig private lateinit var labConfig: LabConfig private lateinit var clientId: String private lateinit var mMqttWorker: MqttWorker fun initMqtt() { deviceConfig = RoomTool.getInstance().deviceConfigDao().deviceConfig labConfig = RoomTool.getInstance().labConfigDao().labConfig this.clientId = "${deviceConfig.devId}#${System.nanoTime()}" LogUtils.d( "MQTT配置", deviceConfig.mqttAddress, deviceConfig.mqttName, deviceConfig.mqttPas ) mMqttWorker = MqttWorker( deviceConfig.mqttAddress, clientId, deviceConfig.mqttName, deviceConfig.mqttPas ) mMqttWorker.setListener(object : MqttWorker.Listener { override fun onConnectSuccess() { LogUtils.d("MQTT连接成功") // 连接成功后,订阅主题 val topics = arrayOf( Constants.MqttConfig.Topic.MIDDLE_INFO.plus(labConfig.labId), Constants.MqttConfig.Topic.RIGHT_INFO.plus(labConfig.labId), Constants.MqttConfig.Topic.BULLETIN_BOARD.plus(labConfig.labId), Constants.MqttConfig.Topic.EMERGENCY, Constants.MqttConfig.Topic.THINGS.plus(labConfig.labId), Constants.MqttConfig.Topic.LAB_INFO.plus(labConfig.labId), Constants.MqttConfig.Topic.REMOTE_UNLOCK ) // 订阅主题 mMqttWorker.subscribe(topics, 0) } override fun messageArrived(topic: String?, message: MqttMessage?) { LogUtils.d("MQTT接收", mMqttWorker, topic, message) dispatchMessage(topic, message.toString()) } }) } fun connect() { mMqttWorker.connect() } /** * 释放单例, 及其所引用的资源 */ @Synchronized fun release() { try { mMqttWorker.setListener(null) mMqttWorker.release() } catch (e: Exception) { e.printStackTrace() } } /** * 分发消息 * * @param topic * @param message */ private fun dispatchMessage(topic: String?, message: String) { try { when (topic) { Constants.MqttConfig.Topic.MIDDLE_INFO.plus(labConfig.labId) -> { EventBus.getDefault() .post(UpdateUiEvent(Constants.MqttConfig.Topic.MIDDLE_INFO)) } Constants.MqttConfig.Topic.RIGHT_INFO.plus(labConfig.labId) -> { EventBus.getDefault().post(OnlineUserEvent()) } Constants.MqttConfig.Topic.BULLETIN_BOARD.plus(labConfig.labId) -> { val typeOfT = object : TypeToken>() {}.type val entity: List = mGson.fromJson(message, typeOfT) val data = BulletinBoardEntity.Data().apply { functionStatuses = entity } EventBus.getDefault().post(BulletinBoardEvent(data)) } Constants.MqttConfig.Topic.EMERGENCY -> { handleWarning() } Constants.MqttConfig.Topic.REMOTE_UNLOCK -> { LogUtils.d("Jayce", message) val jsonObject = JSONObject(message) val messageId: Long = jsonObject.getLong("messageId") val deviceNo: String = jsonObject.getString("deviceNo") if (deviceNo == deviceConfig.devId) { val status = Tool.INSTANCE.openDoor(null, null) if (2 != status) { LogUtils.d("开锁中") } else { AsyncTask.execute { HttpTool.remoteUnlock(messageId) } ThreadUtils.runOnUiThread { ToastUtils.showLong("远程开锁!") } } } } Constants.MqttConfig.Topic.THINGS.plus(labConfig.labId) -> { EventBus.getDefault().post(ThingsEvent()) } Constants.MqttConfig.Topic.LAB_INFO.plus(labConfig.labId) -> { // 实验室信息更新 EventBus.getDefault().post(LabInfoEvent(message = message)) } } } catch (e: Exception) { LogUtils.d(Log.getStackTraceString(e)) } } private fun handleWarning() { val disposable = ApiRepository.warnList(labConfig.labId.toString()) .subscribe({ data -> EventBus.getDefault().post(WarningEvent(data)) }, { throwable -> throwable.printStackTrace() }) CompositeDisposable().apply { if (!isDisposed) add(disposable) } } }