当我使用 playframework 响应分块的 akka 源时,为什么会发生 java.lang.IllegalStateException?

问题描述 投票:0回答:0

当我使用 playframework 响应 mp3 音频时。我想使用分块收发器。然后我在 akka 流中创建一个 akka 源来响应。并使用播放 Ok.chunked 结果。但它有一个例外。

游戏版本为2.5.18。 akka-stream版本是2.4.20。 例外是:

[ERROR 2023-08-02 00:13:13,223] [application-akka.actor.default-dispatcher-32 c.x.a.c.DefaultErrorHandler] - A server error occurred for speech-staging.ai.srv:speech-staging.ai
.srv/stream_audio_v2,from: 10.38.162.91
play.core.server.common.ServerResultException: Error converting Play Result for server backend
        at play.core.server.common.ServerResultUtils$.play$core$server$common$ServerResultUtils$$handleConversionError$1(ServerResultUtils.scala:108)
        at play.core.server.common.ServerResultUtils$.resultConversionWithErrorHandling(ServerResultUtils.scala:129)
        at play.core.server.netty.NettyModelConversion.convertResult(NettyModelConversion.scala:251)
        at play.core.server.netty.PlayRequestHandler$$anonfun$play$core$server$netty$PlayRequestHandler$$handleAction$2$$anonfun$apply$4$$anonfun$apply$5.apply(PlayRequestHandl
er.scala:273)
        at play.core.server.netty.PlayRequestHandler$$anonfun$play$core$server$netty$PlayRequestHandler$$handleAction$2$$anonfun$apply$4$$anonfun$apply$5.apply(PlayRequestHandl
er.scala:267)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109)
        at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
        at scala.concurrent.Future$$anonfun$flatMap$1$$anonfun$apply$3.apply(Future.scala:256)
        at scala.concurrent.Future$$anonfun$flatMap$1$$anonfun$apply$3.apply(Future.scala:256)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:337)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:256)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:256)
        at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: internal error
        at akka.stream.impl.VirtualPublisher.registerPublisher(StreamLayout.scala:832)
        at akka.stream.impl.MaterializerSession.akka$stream$impl$MaterializerSession$$doSubscribe(StreamLayout.scala:1034)
        at akka.stream.impl.MaterializerSession$$anonfun$materialize$3$$anonfun$apply$3.apply(StreamLayout.scala:925)
        at akka.stream.impl.MaterializerSession$$anonfun$materialize$3$$anonfun$apply$3.apply(StreamLayout.scala:924)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at akka.stream.impl.MaterializerSession$$anonfun$materialize$3.apply(StreamLayout.scala:924)
        at akka.stream.impl.MaterializerSession$$anonfun$materialize$3.apply(StreamLayout.scala:924)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:924)
        at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:256)
        at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:146)
        at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:350)
        at akka.stream.scaladsl.Source.runWith(Source.scala:81)
        at play.core.server.netty.NettyModelConversion.play$core$server$netty$NettyModelConversion$$createChunkedResponse(NettyModelConversion.scala:272)
        at play.core.server.netty.NettyModelConversion$$anonfun$convertResult$1.apply(NettyModelConversion.scala:205)
        at play.core.server.netty.NettyModelConversion$$anonfun$convertResult$1.apply(NettyModelConversion.scala:182)
        at play.core.server.common.ServerResultUtils$.resultConversionWithErrorHandling(ServerResultUtils.scala:127)
        ... 41 common frames omitted

我的代码是:

class StreamTest @Inject()(actorSystem: ActorSystem)(implicit exec: ExecutionContext) extends Controller {
    def stream_audio_v2(): Action[AnyContent] = Action.async { req =>
        val source = Source.actorRef[ByteString](10000, OverflowStrategy.fail).mapMaterializedValue { sourceActor =>
            val actor = actorSystem.actorOf(Props(new StreamActor(sourceActor)).withDispatcher("assist-dispatcher"))
            actor ! "stream_audio_v2"
        }

        Future.successful(Ok.chunked(source).as("audio/mp3"))
    }
}
class StreamActor(outActor: ActorRef = null) extends Actor {
    override def receive: Receive = {

        case "stream_audio_v2" =>
            log.info("receive stream_audio_v2")
            val out = outActor

            log.info("out put streamProducer.stream")

            Future {
                val source = StreamActor.getSource()
                var buffer = new Array[Byte](1024)
                var length = 0
                while (length != -1) {
                    length = source.read(buffer)
                    out ! ByteString(buffer.clone())
                    Thread.sleep(1)
                }
                source.close()
                out ! akka.actor.Status.Success(())
            }

}

我尝试使用流来代替分块,但它没有用。

    def stream_audio_v2_5(): Action[AnyContent] = Action.async { req =>
        val source = Source.actorRef[ByteString](10000, OverflowStrategy.fail).mapMaterializedValue { sourceActor =>
            val actor = actorSystem.actorOf(Props(new StreamActor(sourceActor)).withDispatcher("assist-dispatcher"))
            actor ! "stream_audio_v2"
        }

        Future.successful(Ok.sendEntity(HttpEntity.Streamed(source, None, Some("audio/mp3"))))
    }

它有相同的异常。

我真的希望你能帮助我,谢谢。

scala playframework akka-stream
© www.soinside.com 2019 - 2024. All rights reserved.