将Akka流传递到上游服务以进行填充

问题描述 投票:6回答:1

我需要调用上游服务(Azure Blob服务)以将数据推送到OutputStream,然后需要通过akka转过来并将其推送回客户端。没有akka(只有servlet代码),我只需要获取ServletOutputStream并将其传递给azure服务的方法即可。

我可以尝试偶然发现的最接近的位置,显然这是错误的,是这样的

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

想法是我正在调用上游服务,以通过调用来填充输出流blobClient.download(os);

似乎lambda函数被调用并返回,但是随后它失败了,因为没有数据或其他东西。好像我不应该让那个lambda函数来完成这项工作,但是也许返回一些可以完成这项工作的对象?不确定。

如何做到这一点?

java stream akka akka-stream
1个回答
0
投票

在这种情况下,OutputStreamSource的“物化值”,只有在流运行时(或“物化”为正在运行的流)才创建。由于您将Source交给Akka HTTP,因此无法运行它,而稍后它将实际运行您的源代码。

.mapMaterializedValue(matval -> ...)通常用于转换实现值,但是由于它是实现的一部分而被调用,因此您可以使用它来产生副作用,例如在消息中发送matval,就像您已经知道的那样,即使看起来很时髦,也不一定有什么错。重要的是要了解,在lambda完成之前,流不会完成其实现并开始运行。如果download()阻塞而不是分叉另一个线程上的某些工作并立即返回,则意味着出现问题。

但是还有另一种解决方案:Source.preMaterialize(),它实现了源,并为您提供了实现值的Pair和可用于消耗已经启动的源的新Source

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

[请注意,您的代码中还有一些其他事情要考虑,最重要的是,如果blobClient.download(os)调用阻塞直到完成,并且您从actor进行调用,那么在这种情况下,您必须确保actor不会使调度程序饿死,并阻止应用程序中的其他参与者执行(请参阅Akka docs:https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management)。

© www.soinside.com 2019 - 2024. All rights reserved.