MqttManager.kt 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package xn.xxp.mqtt
  2. import android.os.AsyncTask
  3. import android.util.Log
  4. import com.blankj.utilcode.util.LogUtils
  5. import com.blankj.utilcode.util.ThreadUtils
  6. import com.blankj.utilcode.util.ToastUtils
  7. import com.google.gson.Gson
  8. import com.google.gson.reflect.TypeToken
  9. import http.client.ApiRepository
  10. import http.client.HttpTool
  11. import http.vo.response.LabBulletinBoardVo
  12. import io.reactivex.rxjava3.disposables.CompositeDisposable
  13. import org.eclipse.paho.client.mqttv3.MqttMessage
  14. import org.greenrobot.eventbus.EventBus
  15. import org.json.JSONObject
  16. import xn.xxp.home.auth.AuthType
  17. import xn.xxp.mqtt.entity.BulletinBoardEntity
  18. import xn.xxp.mqtt.event.BulletinBoardEvent
  19. import xn.xxp.mqtt.event.LabInfoEvent
  20. import xn.xxp.mqtt.event.OnlineUserEvent
  21. import xn.xxp.mqtt.event.ThingsEvent
  22. import xn.xxp.mqtt.event.UpdateUiEvent
  23. import xn.xxp.mqtt.event.WarningEvent
  24. import xn.xxp.room.RoomTool
  25. import xn.xxp.room.bean.DeviceConfig
  26. import xn.xxp.room.bean.LabConfig
  27. import xn.xxp.utils.Constants
  28. import xn.xxp.utils.Tool
  29. /**
  30. * info
  31. *
  32. * @author ReiChin_
  33. */
  34. class MqttManager private constructor() {
  35. companion object {
  36. val instance: MqttManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
  37. MqttManager()
  38. }
  39. }
  40. private val mGson = Gson()
  41. private lateinit var deviceConfig: DeviceConfig
  42. private lateinit var labConfig: LabConfig
  43. private lateinit var clientId: String
  44. private lateinit var mMqttWorker: MqttWorker
  45. fun initMqtt() {
  46. deviceConfig = RoomTool.getInstance().deviceConfigDao().deviceConfig
  47. labConfig = RoomTool.getInstance().labConfigDao().labConfig
  48. this.clientId = "${deviceConfig.devId}#${System.nanoTime()}"
  49. LogUtils.d(
  50. "MQTT配置",
  51. deviceConfig.mqttAddress,
  52. deviceConfig.mqttName,
  53. deviceConfig.mqttPas
  54. )
  55. mMqttWorker = MqttWorker(
  56. deviceConfig.mqttAddress,
  57. clientId,
  58. deviceConfig.mqttName,
  59. deviceConfig.mqttPas
  60. )
  61. mMqttWorker.setListener(object : MqttWorker.Listener {
  62. override fun onConnectSuccess() {
  63. LogUtils.d("MQTT连接成功")
  64. // 连接成功后,订阅主题
  65. val topics = arrayOf(
  66. Constants.MqttConfig.Topic.MIDDLE_INFO.plus(labConfig.labId),
  67. Constants.MqttConfig.Topic.RIGHT_INFO.plus(labConfig.labId),
  68. Constants.MqttConfig.Topic.BULLETIN_BOARD.plus(labConfig.labId),
  69. Constants.MqttConfig.Topic.EMERGENCY,
  70. Constants.MqttConfig.Topic.THINGS.plus(labConfig.labId),
  71. Constants.MqttConfig.Topic.LAB_INFO.plus(labConfig.labId),
  72. Constants.MqttConfig.Topic.REMOTE_UNLOCK
  73. )
  74. // 订阅主题
  75. mMqttWorker.subscribe(topics, 0)
  76. }
  77. override fun messageArrived(topic: String?, message: MqttMessage?) {
  78. LogUtils.d("MQTT接收", mMqttWorker, topic, message)
  79. dispatchMessage(topic, message.toString())
  80. }
  81. })
  82. }
  83. fun connect() {
  84. mMqttWorker.connect()
  85. }
  86. /**
  87. * 释放单例, 及其所引用的资源
  88. */
  89. @Synchronized
  90. fun release() {
  91. try {
  92. mMqttWorker.setListener(null)
  93. mMqttWorker.release()
  94. } catch (e: Exception) {
  95. e.printStackTrace()
  96. }
  97. }
  98. /**
  99. * 分发消息
  100. *
  101. * @param topic
  102. * @param message
  103. */
  104. private fun dispatchMessage(topic: String?, message: String) {
  105. try {
  106. when (topic) {
  107. Constants.MqttConfig.Topic.MIDDLE_INFO.plus(labConfig.labId) -> {
  108. EventBus.getDefault()
  109. .post(UpdateUiEvent(Constants.MqttConfig.Topic.MIDDLE_INFO))
  110. }
  111. Constants.MqttConfig.Topic.RIGHT_INFO.plus(labConfig.labId) -> {
  112. EventBus.getDefault().post(OnlineUserEvent())
  113. }
  114. Constants.MqttConfig.Topic.BULLETIN_BOARD.plus(labConfig.labId) -> {
  115. val typeOfT = object : TypeToken<List<LabBulletinBoardVo>>() {}.type
  116. val entity: List<LabBulletinBoardVo> = mGson.fromJson(message, typeOfT)
  117. val data = BulletinBoardEntity.Data().apply {
  118. functionStatuses = entity
  119. }
  120. EventBus.getDefault().post(BulletinBoardEvent(data))
  121. }
  122. Constants.MqttConfig.Topic.EMERGENCY -> {
  123. handleWarning()
  124. }
  125. Constants.MqttConfig.Topic.REMOTE_UNLOCK -> {
  126. LogUtils.d("Jayce", message)
  127. val jsonObject = JSONObject(message)
  128. val messageId: Long = jsonObject.getLong("messageId")
  129. val deviceNo: String = jsonObject.getString("deviceNo")
  130. if (deviceNo == deviceConfig.devId) {
  131. val status = Tool.INSTANCE.openDoor(null, null)
  132. if (2 != status) {
  133. LogUtils.d("开锁中")
  134. } else {
  135. AsyncTask.execute { HttpTool.remoteUnlock(messageId) }
  136. ThreadUtils.runOnUiThread { ToastUtils.showLong("远程开锁!") }
  137. }
  138. }
  139. }
  140. Constants.MqttConfig.Topic.THINGS.plus(labConfig.labId) -> {
  141. EventBus.getDefault().post(ThingsEvent())
  142. }
  143. Constants.MqttConfig.Topic.LAB_INFO.plus(labConfig.labId) -> {
  144. // 实验室信息更新
  145. EventBus.getDefault().post(LabInfoEvent(message = message))
  146. }
  147. }
  148. } catch (e: Exception) {
  149. LogUtils.d(Log.getStackTraceString(e))
  150. }
  151. }
  152. private fun handleWarning() {
  153. val disposable = ApiRepository.warnList(labConfig.labId.toString())
  154. .subscribe({ data ->
  155. EventBus.getDefault().post(WarningEvent(data))
  156. }, { throwable ->
  157. throwable.printStackTrace()
  158. })
  159. CompositeDisposable().apply {
  160. if (!isDisposed) add(disposable)
  161. }
  162. }
  163. }