使用 ScalaFX 监控 Akka 流源

问题描述 投票:0回答:1

我要解决的是以下情况:
给定一个无限运行的 Akka Stream,我希望能够监视流的某些点。我能想到的最好方法是在哪里将消息发送给一个 Actor,它也是一个

Source
。这使得我可以非常灵活地连接单个源或将多个源合并到 websocket 或我想要连接的任何其他客户端。然而,在这种特定情况下,我尝试将 ScalaFX 与 Akka Source 连接,但它没有按预期工作。

当我运行下面的代码时,两个计数器开始都正常,但过了一会儿,其中一个计数器停止并且永远不会恢复。我知道使用 ScalaFX 时需要考虑线程的特殊注意事项,但我没有足够的知识来理解这里发生的情况或调试它。下面是一个最小的运行示例,大约 5 秒后问题应该会显现出来。

我的问题是:

如何更改此代码以使其按预期工作?

import akka.NotUsed

import scalafx.Includes._
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Sink, Source}

import scalafx.application.JFXApp
import scalafx.beans.property.{IntegerProperty, StringProperty}
import scalafx.scene.Scene
import scalafx.scene.layout.BorderPane
import scalafx.scene.text.Text
import scala.concurrent.duration._

/**
  * Created by henke on 2017-06-10.
  */
object MonitorApp extends JFXApp {

  implicit val system = ActorSystem("monitor")
  implicit val mat = ActorMaterializer()

  val value1 = StringProperty("0")
  val value2 = StringProperty("0")

  stage = new JFXApp.PrimaryStage {
    title = "Akka Stream Monitor"
    scene = new Scene(600, 400) {
      root = new BorderPane() {
        left = new Text {
          text <== value1
        }
        right = new Text {
          text <== value2
        }
      }
    }
  }

  override def stopApp() = system.terminate()

  val monitor1 = createMonitor[Int]
  val monitor2 = createMonitor[Int]

  val marketChangeActor1 = monitor1
    .to(Sink.foreach{ v =>
      value1() = v.toString
    }).run()

  val marketChangeActor2 = monitor2
    .to(Sink.foreach{ v =>
      value2() = v.toString
    }).run()

  val monitorActor = Source[Int](1 to 100)
    .throttle(1, 1.second, 1, ThrottleMode.shaping)
    .via(logToMonitorAndContinue(marketChangeActor1))
    .map(_ * 10)
    .via(logToMonitorAndContinue(marketChangeActor2))
    .to(Sink.ignore).run()

  def createMonitor[T]: Source[T, ActorRef] = Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail)

  def logToMonitorAndContinue[T](monitor: ActorRef): Flow[T, T, NotUsed] = {
    Flow[T].map{ e =>
      monitor ! e
      e
    }
  }
}
scala javafx akka akka-stream scalafx
1个回答
1
投票

您似乎在 Actor 系统线程中为属性赋值(从而影响 UI)。但是,与 UI 的所有交互都应在 JavaFX GUI 线程中完成。尝试将

value1() = v.toString
和第二个包装在 Platform.runLater 调用中。

除了 JavaFX-Swing 集成文档之外,我找不到关于使用

runLater
与 JavaFX 数据交互的明确声明,但这在 UI 库中很常见;例如,Swing 及其
SwingUtilities.invokeLater
方法也是如此。

© www.soinside.com 2019 - 2024. All rights reserved.