Apache Beam Java 流管道中出现 OOM 错误

问题描述 投票:0回答:1

我的数据流作业流管道设计如下。

  1. 从扳手更改流中读取更新/插入的行作为数据更改记录
  2. 如果插入 datachangerecord mod 则创建表行
  3. 如果 datachangerecord mod 更新而不是收集主键作为 Pcollection(因为我需要完整的行插入 GBQ,datachangerocord newJson 只给我更新的列。不是所有列)
  4. 在更新键值 Pcollection 上应用 5000 大小的 GroupIntoBatches,MaxBufferingDuration 为 5000ms 这会将我的更新 Pcollection 分成批次,每批次包含 5000 个密钥。
  5. 使用spanner DatabaseClient再次调用spanner,这里使用
    In
    查询,这样对于5000个更新密钥,它将进行单个spanner调用
  6. 比为扳手更新结果集创建表行。
  7. 将插入表行(来自步骤 2)和更新表行展平为单个 pcollection
  8. 发布到 pubsub 主题以更新和插入表行
  9. 将更新和插入表行写入GBQ

这是一个流作业,由于 Spanner 更新调用而导致 OOM(如果我删除步骤 3 到 6,则不会出现 OOM)

作业分析器给出以下痕迹

来自分析器

java arrays copyofrange 
占用更多内存,下面是作业日志

generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:

  1. 如何解决这个错误?
  2. 有什么方法可以从扳手更改流中获取完整的行,这样我就可以避免扳手调用?
java google-cloud-platform apache-beam google-cloud-spanner
1个回答
1
投票

正如 @John Hanley 所说,

java.lang.OutOfMemoryError: unable to create native thread
可以通过减少内存使用来优化管道以及为工作线程或每个线程分配更多可用内存来解决。

此外,请参阅此数据流管道图以可视化故障排除工作流程。

image

© www.soinside.com 2019 - 2024. All rights reserved.