从数据库加载大量记录时内存不足

问题描述 投票:2回答:1

我在Akka Streams中使用slick从数据库(postgresql)加载大量记录(~2M)并将它们写入S3文件。但是,我注意到下面的代码适用于大约50k左右的记录,但是对于大约100k左右的记录都没有。

  val allResults: Future[Seq[MyEntityImpl]] =
    MyRepository.getAllRecordss()

  val results: Future[MultipartUploadResult] = Source
    .fromFuture(allResults)
    .map(seek => seek.toList)
    .mapConcat(identity)
    .map(myEntity => myEntity.toPSV + "\n")
    .map(s => ByteString(s))
    .runWith(s3Sink)

以下是myEntity的样子:

case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
  def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}

我正在寻找一种方法来以更加反应的方式执行此操作,以便它不会耗尽内存。

scala akka slick akka-stream reactive-streams
1个回答
1
投票

潜在的问题

问题是您将所有记录从数据库中提取到本地内存中,然后再将它们分发到s3Sink

数据被拉入内存的第一个位置可能是你的MyRepository.getAllRecords()方法。大多数(如果不是全部)Seq实现都是基于内存的。你肯定使用本地内存的第二个地方是seek.toList,因为List将所有数据存储在内存中。

而不是从Seq getAllRecords返回you should be returning a slick-based akka Source directly。这将确保您的物化流只需要内存用于瞬态处理步骤,然后再转到s3。

如果您的方法定义更改为:

def getAllRecords() : Source[MyEntityImpl, _]

然后流的其余部分将以被动方式运行:

MyRepository
  .getAllRecords()
  .map(myEntity => myEntity.toPSV + "\n")
  .map(ByteString.apply)
  .runWith(s3Sink)
© www.soinside.com 2019 - 2024. All rights reserved.