Spark Structured Stream - Kinesis 作为数据源

问题描述 投票:0回答:1

我正在尝试使用 psypark 结构化流来消耗运动数据流记录。 我正在尝试在 aws 胶水批处理作业中运行此代码。我的目标是使用检查点并将检查点和数据保存到 s3。我能够使用数据,但它只为每个触发器提供很少的记录,而运动数据流有很多记录。我正在使用 TRIM_HORIZON,它是 earliest 的别名并触发 spark.writestream 一次,以便它执行一次并关闭集群。当我再次运行该作业时,它会从检查点选择最新的偏移量并运行。

kinesis = spark.readStream.format('kinesis') \
        .option('streamName', kinesis_stream_name) \
        .option('endpointUrl', 'blaablaa')\
        .option('region', region) \
        .option('startingPosition', 'TRIM_HORIZON')\
        .option('maxOffsetsPerTrigger',100000)\
        .load()

// 在这里做一些转换

TargetKinesisData = stream_data.writeStream.format("parquet").outputMode('append').option(
        "path", s3_target).option("checkpointLocation", checkpoint_location).trigger(once=True).start().awaitTermination()
pyspark aws-glue spark-structured-streaming amazon-kinesis
1个回答
0
投票

简答

spark checkpoint location 参数存储有关它成功处理的先前记录的信息。因此,当相同的记录再次出现时,它不会处理它们。如果您更改/删除检查点位置并重新运行该作业,它将消耗您流中的所有数据。

详情

编写 Spark Streaming 作业时,有两个选项决定在重新启动流式作业时将读取哪些数据。

  1. 起始位置参数:此参数将根据共享的参数(即最早、最新或基于时间戳)从 AWS Kinesis 获取结果。阅读以下docs以了解用法。
  2. Spark Stream Checkpoint:此参数存储有关哪些记录已成功处理的信息,这有助于提供一致性并避免重新读取旧记录。阅读这里关于火花流容错
© www.soinside.com 2019 - 2024. All rights reserved.