使用 Kafka 进行 Spark Structured Streaming 时我们什么时候需要检查点?在 ReadStream 或 Write Stream 中?

问题描述 投票:0回答:1

我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我了解检查点的概念、用途和优势。但是让我感到困惑的一件事是使用它的正确位置是什么? ReadStream 还是 WriteStream?

比如我是这样看书的:

def read_from_kafka(spark: SparkSession, kafka_config: dict, topic_name: str, column_schema: str, checkpoint_location: str):
    stream_df = spark.readStream.format('kafka') \
        .option('kafka.bootstrap.servers', kafka_config['broker']) \
        .option('subscribe', topic_name) \
        .option('kafka.security.protocol', kafka_config['security_protocol']) \
        .option('kafka.sasl.mechanism', kafka_config['sasl_mechanism']) \
        .option('kafka.sasl.jaas.config', jass_config) \
        .option('kafka.sasl.login.callback.handler.class', kafka_config['sasl_login_callback_handler_class']) \
        .option('startingOffsets', 'earliest') \
        .option("maxOffsetsPerTrigger", kafka_config['max_offsets_per_trigger']) \
        .option('checkpointLocation', checkpoint_location) \
        .load() \
        .select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
    return stream_df

stream_df = read_from_kafka(spark, config, topic_name, schema, checkpoint_location)

并将相同的 df 写入如下所示的某个位置。

def write_data(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str):
    kafka_df.writeStream \
        .format('kafka') \
        .foreachBatch(some_func) \
        .option('checkpointLocation', checkpoint_location) \
        .start()
        .awaitTermination()

def some_fun(kafka_df: DataFrame, batch_id: int):
    kafka_df.write.parquet(s'some_path_{batch_id}')

我的困惑是我是否需要在读取和写入流上都设置检查点?我看到一些文档只在

WriteStream()
使用检查点 如果我只在
WriteStream()
使用它而不在
ReadStream()
使用它,Kafka + SparkStream怎么知道应该从主题的哪个偏移量读取恢复?
ReadStream()
WriteStream()
都用会有什么影响吗?

谁能告诉我在从 Kafka 主题读取和写入数据时使用检查点的正确位置是什么?

apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

Spark在消费时可以使用Kafka本身存储的消费者组偏移量,因此不需要检查点。

检查点存储您执行的操作的状态,而不仅仅是 Kafka 偏移量。

如果它没有对您的应用程序产生负面影响,那么检查点。

© www.soinside.com 2019 - 2024. All rights reserved.