我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我了解检查点的概念、用途和优势。但是让我感到困惑的一件事是使用它的正确位置是什么? ReadStream 还是 WriteStream?
比如我是这样看书的:
def read_from_kafka(spark: SparkSession, kafka_config: dict, topic_name: str, column_schema: str, checkpoint_location: str):
stream_df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', kafka_config['broker']) \
.option('subscribe', topic_name) \
.option('kafka.security.protocol', kafka_config['security_protocol']) \
.option('kafka.sasl.mechanism', kafka_config['sasl_mechanism']) \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', kafka_config['sasl_login_callback_handler_class']) \
.option('startingOffsets', 'earliest') \
.option("maxOffsetsPerTrigger", kafka_config['max_offsets_per_trigger']) \
.option('checkpointLocation', checkpoint_location) \
.load() \
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
return stream_df
stream_df = read_from_kafka(spark, config, topic_name, schema, checkpoint_location)
并将相同的 df 写入如下所示的某个位置。
def write_data(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str):
kafka_df.writeStream \
.format('kafka') \
.foreachBatch(some_func) \
.option('checkpointLocation', checkpoint_location) \
.start()
.awaitTermination()
def some_fun(kafka_df: DataFrame, batch_id: int):
kafka_df.write.parquet(s'some_path_{batch_id}')
我的困惑是我是否需要在读取和写入流上都设置检查点?我看到一些文档只在
WriteStream()
使用检查点
如果我只在WriteStream()
使用它而不在ReadStream()
使用它,Kafka + SparkStream怎么知道应该从主题的哪个偏移量读取恢复? ReadStream()
和WriteStream()
都用会有什么影响吗?
谁能告诉我在从 Kafka 主题读取和写入数据时使用检查点的正确位置是什么?
Spark在消费时可以使用Kafka本身存储的消费者组偏移量,因此不需要检查点。
检查点存储您执行的操作的状态,而不仅仅是 Kafka 偏移量。
如果它没有对您的应用程序产生负面影响,那么检查点。