我有一个流,在某个时候它将对对象进行分组以创建文件。我想我可以通过在流的早期对对象进行序列化来压缩一些字节。但是我最大的问题是关于如何优化像这样的流的内存占用量:
val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString
sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB
这使我在稳定状态下的内存使用量为[[至少5GB。这是正确的吗?
我可以使用什么策略将其限制为1GB或2GB?原则上,应该通过折叠运算符来实现。注意:我知道一种解决方案是使小组更小,但让我们考虑小组的大小是问题的约束。
group
操作,我想你是说grouped
操作:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html。[如果是这样,则意味着在.grouped(1000000) // 1GB
处,您可以在流中创建一组元素,这些元素可以同时处理,因此,在某一时刻,内存中可以再出现一组1GB大小的元素。因此,为了将流中的内存占用量限制为最大1GB,可以采用以下两种方法之一:
1)减少同时处理的大型小组的数量。这可以通过throttle
操作来实现:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle请查看示例代码段
import scala.concurrent.duration._
...
.group(1000000) // 1GB
.throttle(1, 1 minute)
2)减少大团体人数>
val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism val baseGroupSize = 1000000 // 1GB val groupSize = baseGroupSize / parallelismLevel sourceOfCustomers .via(serializeCustomer) // 1KB .group(groupSize)
希望这会有所帮助!