未从Web客户端发送或接收到Web套接字的消息

问题描述 投票:0回答:1

在Play应用中,我创建了一个套接字服务器:

package controllers


import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.stream.Materializer

class Application @Inject() (cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
  extends AbstractController(cc) {
  def socket = WebSocket.accept[String, String] { request =>
    ActorFlow.actorRef { out =>
      MyWebSocketActor.props(out)
    }
  }
}

import akka.actor._

object MyWebSocketActor {
  def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}

class MyWebSocketActor(out: ActorRef) extends Actor {
  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }
}

更新路由文件:

GET      /ws                                   controllers.Application.socket

读取https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html

我定义WebSocketClientFlow:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {

  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
    Sink.foreach[Message] {
      case message: TextMessage.Strict =>
        println(message.text)
    }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/ws"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
      .toMat(incoming)(Keep.both) // also keep the Future[Done]
      .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

运行文件WebSocketClientFlow返回以下内容:

Success(Done)
closed

对应于正在执行的这些行:

connected.onComplete(println)
closed.foreach(_ => println("closed"))

因此似乎套接字已被成功创建和访问。

但是,MyWebSocketActor应该输出收到的未输出的消息:

  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }

[Hello world]消息在WebSocketClientFlow中发送:

val outgoing = Source.single(TextMessage("hello world!"))

我如何输出在套接字服务器中发送的消息并在客户端中输出响应?

发送到客户端的输出在MyWebSocketActor中定义:

out ! ("I received your message: " + msg)

更新:

receive功能:

  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }

正在被调用,但是套接字客户端似乎没有发送或接收out消息("I received your message: " + msg)

我尝试更改从发送给客户端的响应

out ! ("I received your message: " + msg)

to

out ! TextMessage("hello world!")

应调用:

Sink.foreach[Message] {
  case message: TextMessage.Strict =>
    println(message.text)
}

但是由于未调用println(message.text),因此未收到该消息。

scala websocket playframework akka
1个回答
0
投票

发布答案案例对其他开发人员很有用。如果有任何可以改进的地方,请发表评论。

暴露一个套接字:

package controllers


import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.Materializer
import akka.actor._

class Application @Inject()(cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
  extends AbstractController(cc) {
  def socket = WebSocket.accept[String, String] { request =>
    ActorFlow.actorRef { out =>
      MyWebSocketActor.props(out)
    }
  }
}

object MyWebSocketActor {

  var counter = 0

  def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}

class MyWebSocketActor(out: ActorRef) extends Actor {

  def receive = {
    case msg: String =>
      MyWebSocketActor.counter = MyWebSocketActor.counter + 1
      print("received message: " + msg + "," + MyWebSocketActor.counter)
      out ! "hello world! "
  }
}

测试套接字:

import akka.actor.{ActorRef, ActorSystem}
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val req = WebSocketRequest(uri = "ws://localhost:9000/ws")
    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
      Flow[Message]
        .map(message => println(s"Received text message: [$message]"))
        .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
      messageSource
        .viaMat(webSocketFlow)(Keep.both)
        .toMat(messageSink)(Keep.both)
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    ws ! TextMessage.Strict("Hello World")
    ws ! TextMessage.Strict("Hi")
    ws ! TextMessage.Strict("Yay!")

  }
}

在Play2Run控制台上,打印为:

[info] play.api.Play - Application started (Dev) (no global state)
received message: Hello World,1received message: Hi,2received message: Yay!,3

在WebSocketClientFlow控制台上,打印为:

Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]
© www.soinside.com 2019 - 2024. All rights reserved.