在生产中,我们的应用程序生成了以下堆栈跟踪。这里没有我们自己的代码 - 没有太多可继续的。
了解
akka-stream
库的人可以解释一下吗:
播放版本:2.8.18 阿卡版本:2.6.20
2023-11-06 09:20:05 GMT [ERROR] p.c.s.c.WebSocketFlowHandler [WebSocketFlowHandler.scala:249] - WebSocket flow threw exception
java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:62)
at akka.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:481)
at akka.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:486)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:148)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
如果没有看到你的代码(即你想要实现的目标),就很难知道出了什么问题。但它表明,在某个地方,您已经获取了由
Publisher
生成的 Sink.asPublisher
,并尝试订阅它两次。这是不可能的,因为 Sink.asPublisher
返回的发布者代表流的一个正在运行的实例,并且如果您第二次订阅,则无法为第二个订阅者重新启动该流。
从概念上思考,当您收到 WebSocket 连接,并且想要使用传入的消息流时,您可以提供一个接收器来执行此操作。
Sink.asPublisher
将该流转换为反应式流发布者,您可以通过其他反应式流库使用来自它的消息。当您订阅它时,提供的订阅者将连接到现有的 WebSocket 连接。但是,如果您再次尝试从该发布者那里订阅,它会做什么呢? WebSocket 服务器无法返回到 WebSocket 客户端并说“嘿,你可以再次连接到我,以便我可以将另一个消息流发送给这个新订阅者吗?”不行,只能失败,说不能订阅两次。