无法使用 HiveMQ Async Client 从我的本地计算机连接到任何 MQTT 代理

问题描述 投票:0回答:1

我有以下发布方法,它连接到给定的代理并发送消息,然后断开连接:

 def publish(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
    val client = asyncMqttClient(mqttCfg)

    // Define a custom wrapper type to represent the result of the publish operation
    sealed trait PublishResult
    case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
    case class FailedPublish(error: Throwable) extends PublishResult

    asyncMqttClient(mqttCfg).connect()
      .thenCompose(_ => client.publishWith().topic(topic).qos(mqttQos).payload("HELLO WORLD!".getBytes()).send())
      .thenAccept(result => {
        val publishResult = Try(result)
        publishResult match {
          case Success(message) =>
            println(s"publishedResult: $message") // TODO: Change to logger
          case Failure(error) =>
            println(s"Failed to publish: ${error.getMessage}") // TODO: Change to logg
        }
      })
      .thenCompose(_ => client.disconnect())
      .thenAccept(_ => println("disconnected"))
      .asScala.map(_ => ())
  }

然后我有一个 Scala 测试,简单地测试如下:

  "MqttClientFactory#publish" should "connect to a local MQTT broker and publish" in {
    val mqttConfig = MqttConfig("cpo-platform-test", "test.mosquitto.org", 1883)
    val published = MqttClientFactory.publish(
      mqttConfig,
      "cpo-test-topic",
      MqttQos.EXACTLY_ONCE
    )
    whenReady(published, timeout(Span(100, Seconds))) { Unit => {
      val client = MqttClientFactory.asyncMqttClient(mqttConfig)
      println("In here ****************** ")
      client
        .connect()
        .thenCompose(_ => client.subscribeWith().topicFilter("cpo-test-topic").qos(MqttQos.EXACTLY_ONCE).callback(println).send())
      }
    }
  }

当我运行这个时,它会在我等待Future在whenReady(......)中完成的地方导致以下错误

The future returned an exception of type: java.util.concurrent.CompletionException, with message: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected..
ScalaTestFailureLocation: com.openelectrons.cpo.mqtt.MqttClientFactoryTest at (MqttClientFactoryTest.scala:29)

我在本地计算机上尝试了多个代理,即 eclipse mosquitto 代理、cedalo 代理,它们都返回相同的消息。我究竟做错了什么?有一个简单的连接来让它工作是非常烦人的。有什么帮助吗?

scala mqtt mosquitto hivemq
1个回答
0
投票

这里有一个简单的 POC,我可以在本地运行它。它不验证任何东西。我只启动一个 eclipse-mosquitto 容器,使用 hivemq-mqtt-client 连接到服务,发布消息,订阅主题并将收到的消息打印到

stdout

  • build.sbt
libraryDependencies ++= Seq(
  "com.hivemq" % "hivemq-mqtt-client" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.2.16" % Test,
  "com.dimafeng" %% "testcontainers-scala-scalatest" % TestcontainersScalaVersion % Test
)
  • docker-compose.yaml
version: "3"
services:
  mosquitto:
    image: eclipse-mosquitto:2.0.18
    volumes:
      - /absolute/path/to/mosquitto/config/:/mosquitto/config
    ports:
      - 1883:1883
      - 9001:9001
  • /absolute/path/to/mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
  • DummyMosquittoTest.scala
import com.dimafeng.testcontainers.scalatest.TestContainersForAll
import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.{Mqtt5AsyncClient, Mqtt5Client}
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.matchers.should.Matchers

import java.io.File
import java.util.UUID
import scala.jdk.FutureConverters._

class TestcontainersMainTest
    extends AsyncFunSuite
    with Matchers
    with TestContainersForAll {

  override type Containers = DockerComposeContainer

  override def startContainers(): DockerComposeContainer = {
    DockerComposeContainer
      .Def(
        composeFiles = new File(
          this.getClass.getClassLoader
            .getResource("docker-compose.yaml")
            .getFile
        ),
        exposedServices = Seq(ExposedService(name = "mosquitto", port = 1883))
      )
      .start()
  }

  test("mosquitto container") {
    withContainers { container =>
      val client: Mqtt5AsyncClient = Mqtt5Client
        .builder()
        .identifier(UUID.randomUUID().toString())
        .serverHost("broker.hivemq.com")
        .buildAsync()

      client
        .connect()
        .thenCompose(_ =>
          client
            .publishWith()
            .topic("test/topic")
            .payload("some random message!!!".getBytes)
            .send()
        )
        .asScala
        .map(_ => 1 should be(1))

      client
        .subscribeWith()
        .topicFilter("test/topic")
        .qos(MqttQos.EXACTLY_ONCE)
        .callback(x => println(new String(x.getPayloadAsBytes)))
        .send()
        .asScala
        .map(_ => 1 should be(1))
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.