带图DSL的Websocket

问题描述 投票:1回答:1

我正在尝试使用Akka Flow实现Websocket登录流程。我在Inlet,Outlets和Connection问题上遇到了无数令人讨厌的运行时异常。我的最新消息是:

java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.

片段:

object Login {

    def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
        Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
            import GraphDSL.Implicits._

            val in = Source.fromFuture(future)
            in.named("LoginData")

            val fanIn = Zip[LoginResponse, LoginCommand.UserData]
            val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
            val encryptLoginData = FlowShape(exasolLogin.in, fanIn.out)

            val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))

            val announceLogin = Source.single(LoginCommand)

            in -> fanIn
            announceLogin -> exasolAnnounce -> fanIn

            fanIn -> encryptLoginData -> exasolLogin

            SourceShape(exasolLogin.out)
    })
}

我可能正在使用完全错误的DSL,因为我还没有找到一个解释图形,形状,流量,物化值深度的单一文章。有人可以指出我做错了什么或者可能是怎么写的?


编辑1:

现在已经用->替换了~>并得到了令人讨厌的编译错误:

object Login {

    def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
        Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
            import GraphDSL.Implicits._

            val in = Source.fromFuture(future)
            in.named("LoginData")

            val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
            val exasolLogin = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
            val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => data._1)
            val loginDataMessage = Flow[LoginCommand.UserData].map(data => TextMessage("bar"))

            val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
            val announceResponse = Flow[Message].map(data => LoginResponse("key", "mod", "exp"))

            val loginMessage = Flow[LoginCommand].map(data => TextMessage("foo"))
            val session = builder.add(Flow[Message].map(data => LoginCommand.SessionData(0, 1, "2", "db", "w", 59, 546, 45, "q", "TZ", "TZB")))

            in ~> fanIn.in1
            Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
            fanIn.out ~> encryptLoginData ~> loginDataMessage ~> exasolLogin ~> session

            SourceShape(session.out)
    })
}

这导致

exasol-client/LoginGraph.scala:42: error: overloaded method value ~> with alternatives:
  (to: akka.stream.SinkShape[exasol.LoginCommand.type])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
  (to: akka.stream.Graph[akka.stream.SinkShape[exasol.LoginCommand.type], _])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
  [Out](flow: akka.stream.FlowShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](junction: akka.stream.UniformFanOutShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](junction: akka.stream.UniformFanInShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [Out](via: akka.stream.Graph[akka.stream.FlowShape[exasol.LoginCommand.type,Out],Any])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
  [U >: exasol.LoginCommand.type](to: akka.stream.Inlet[U])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit
 cannot be applied to (akka.stream.FlowShape[exasol.LoginCommand,akka.http.scaladsl.model.ws.TextMessage.Strict])
            Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
scala akka akka-stream akka-http
1个回答
0
投票

你需要这样的东西:

object Login {
    def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String): Source[Message, NotUsed] =
        Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
            import GraphDSL.Implicits._

            val in = Source.fromFuture(future)
            in.named("LoginData")

            val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
            val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
            val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => TextMessage(data.toString)) //stub
            val encryptAnnounceData = Flow[LoginCommand].map(data => TextMessage(data.toString)) //stub
            val decryptAnnounceData = Flow[Message].map(message => LoginResponse(message)) //stub
            val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
            val announceLogin = Source.single(LoginCommand)

            in ~> fanIn.in1
            announceLogin ~> encryptAnnounceData ~> exasolAnnounce ~> decryptAnnounceData ~> fanIn.in0
            fanIn.out ~> encryptLoginData ~> exasolLogin

            SourceShape(exasolLogin.out)
        })
}

请记住, - >和〜>是不同的运算符(您应该使用〜>)。只有在要手动连接形状入口和出口时,才需要为构建器添加形状。

© www.soinside.com 2019 - 2024. All rights reserved.