如何减少Akka流中的内存使用量

问题描述 投票:2回答:1

我有一个流,在某个时候它将对对象进行分组以创建文件。我想我可以通过在流的早期对对象进行序列化来压缩一些字节。但是我最大的问题是关于如何优化像这样的流的内存占用量:

val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString

sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB

这使我在稳定状态下的内存使用量为[[至少5GB。这是正确的吗?

我可以使用什么策略将其限制为1GB或2GB?原则上,应该通过折叠运算符来实现。

注意:我知道一种解决方案是使小组更小,但让我们考虑小组的大小是问题的约束。

scala akka akka-stream
1个回答
4
投票
[对不起,也许我丢失了一些东西,但是我在最新的Akka Stream文档中没有找到group操作,我想你是说grouped操作:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html

[如果是这样,则意味着在.grouped(1000000) // 1GB处,您可以在流中创建一组元素,这些元素可以同时处理,因此,在某一时刻,内存中可以再出现一组1GB大小的元素。因此,为了将流中的内存占用量限制为最大1GB,可以采用以下两种方法之一:

1)减少同时处理的大型小组的数量。这可以通过throttle操作来实现:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle请查看示例代码段

import scala.concurrent.duration._ ... .group(1000000) // 1GB .throttle(1, 1 minute)

2)减少大团体人数>

val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism val baseGroupSize = 1000000 // 1GB val groupSize = baseGroupSize / parallelismLevel sourceOfCustomers .via(serializeCustomer) // 1KB .group(groupSize)

希望这会有所帮助!
© www.soinside.com 2019 - 2024. All rights reserved.