我们有一个批处理云数据流管道作业运行了很长时间,突然开始失败。 一般来说,根据运行的数据量和历史作业,它会在
~10
分钟内完成,但最近开始需要 ~8
小时,并且永远不会完成并引发很多错误。
管道的基本结构是
从 BigQuery 读取 -> 在 GCS 中找到数据 -> 进行一些处理 -> 将数据写入其他项目中其他表中的 BigQuery。
我看到的错误之一是
java.lang.RuntimeException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/test123-development/datasets/datasetmy/tables/extract979_cknkc_failure/streams/Cic2NTM3NDA3NS0wMDAwLTJiMDQtODczMC1kNGY1NDdlNjZjNDQ6czI
at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:270)
at org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:250)
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:435)
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:495)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/test123-development/datasets/datasetmy/tables/extract979_cknkc_failure/streams/Cic2NTM3NDA3NS0wMDAwLTJiMDQtODczMC1kNGY1NDdlNjZjNDQ6czI
at com.google.cloud.bigquery.storage.v1.StreamWriter.requestCallback(StreamWriter.java:574)
at com.google.cloud.bigquery.storage.v1.StreamWriter.access$1000(StreamWriter.java:49)
at com.google.cloud.bigquery.storage.v1.StreamWriter$2.run(StreamWriter.java:225)
at com.google.cloud.bigquery.storage.v1.StreamConnection$1.onResponse(StreamConnection.java:63)
at com.google.cloud.bigquery.storage.v1.StreamConnection$1.onResponse(StreamConnection.java:54)
at com.google.api.gax.tracing.TracedResponseObserver.onResponse(TracedResponseObserver.java:90)
at com.google.api.gax.grpc.ExceptionResponseObserver.onResponseImpl(ExceptionResponseObserver.java:74)
at com.google.api.gax.rpc.StateCheckingResponseObserver.onResponse(StateCheckingResponseObserver.java:62)
at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onMessage(GrpcDirectStreamController.java:134)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:447)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:661)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:646)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
我不明白
Please refer to the row_errors field for details
。我在哪里可以找到 row_errors
或启用它们?
我尝试在本地运行相同的管道,效果很好。
我正在使用以下方法写入 BigQuery
public static BigQueryIO.Write<TestClass> write() {
return BigQueryIO.<TestClass>write()
.to(tableName)
.withFormatFunction(TestClass::toTableRow)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
}
如果需要任何其他详细信息以进一步调查,请告诉我。
该错误不言自明,并且缺乏上下文信息。也很难获得
row_errors
字段的确切详细信息,但我意识到我正在写入两个表(成功和失败表),并且其中一个表的生成模式在我们编写的自定义数据流作业中是错误的。
更新了架构以匹配目标表并且它有效。
我在深入搜索后发现了这个 github 链接,它类似于类似的想法。
当错误内容包含以下文字时:
INVALID_ARGUMENT:处理行时发现错误。请参阅 row_errors 字段了解详细信息。该列表可能不完整,因为 的尺寸限制。实体:
首先:检查是否存在“ 目标表的数据处理策略保持原来的 目标表的结构;
第二:检查生成的表模型;
第三:检查BigQuery中是否存在该表并检查该表 结构;
第四:检查任务生成的表模型是否为 与同名表的表结构一致 BigQuery 中存在的内容,包括字段和字段类型;
第五:如果出现不一致的情况,为了避免造成问题 由于新生成的模型与目标之间的不一致 端表同名,建议更改 “处理数据”策略“清除目标端表结构和 数据'.