我正在尝试使用 psypark 结构化流来消耗运动数据流记录。 我正在尝试在 aws 胶水批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据保存到 s3。我能够使用数据,但它只为每个触发器提供很少的记录,而运动数据流有很多记录。我正在使用 TRIM_HORIZON,它是 earliest 的别名并触发 spark.writestream 一次,以便它执行一次并关闭集群。当我再次运行该作业时,它会从检查点选择最新的偏移量并运行。
kinesis = spark.readStream.format('kinesis') \
.option('streamName', kinesis_stream_name) \
.option('endpointUrl', 'blaablaa')\
.option('region', region) \
.option('startingPosition', 'TRIM_HORIZON')\
.option('maxOffsetsPerTrigger',100000)\
.load()
// 在这里做一些转换
TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
"path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()
简答
spark checkpoint location 参数存储有关它成功处理的先前记录的信息。因此,当相同的记录再次出现时,它不会处理它们。如果您更改/删除检查点位置并重新运行该作业,它将消耗您流中的所有数据。
详情
编写 Spark Streaming 作业时,有两个选项决定在重新启动流式作业时将读取哪些数据。