我正在将 Glue Streaming 作业与 Kinesis Data Stream 结合使用。我希望我的胶水作业始终从最后一个未处理的记录中读取(以防作业出现故障并重新启动)。
相关代码-
df = glueContext.create_data_frame.from_option(
connection_type="kinesis",
connection_options={
... # other properties
"startingPosition": "TRIM_HORIZON"
}
)
glueContext.forEachBatch(
frame=df,
batch_function=batch_fn,
options={
"checkpointLocation": "s3://....",
...
}
)
我的理解是
TRIM_HORIZON
会让胶水作业从一开始就读取数据。
AWS Glue 文档 说 -
AWS Glue 流作业使用检查点而不是作业书签来跟踪已读取的数据。
这让我认为流式粘合作业正在记录上次读取的数据。
我的问题是上面的组合有什么作用?它会从头开始读取但跳过已经处理过的数据吗?
TRIM_HORIZON
表示您想要从 Kinesis 流中最早的可用记录开始读取。这意味着您的 AWS Glue 作业将从头开始处理流中的所有记录。
你需要这样的东西:
df = glueContext.create_data_frame.from_option(
connection_type="kinesis",
connection_options={
... # other properties
"startingPosition": "AFTER_SEQUENCE_NUMBER",
"startingSequenceNumber": starting_sequence_number,
"streamReaderOptions": {
"checkpointLocation": "s3://checkpoint-location"
}
}
)
AFTER_SEQUENCE_NUMBER
表示您将从检查点读取。并且可以从 s3 对象获取该检查点 s3://checkpoint-location/starting_sequence_number.json
更多详细信息请参见此处 使用分片迭代器