使用 Spark streaming + Kafka 时如何修复过期批次?

问题描述 投票:0回答:1

我正在尝试使用

foreachBatch()
从 kafka 主题中读取数据,如下所示。

def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict):
    query = kafka_df.writeStream \
        .format('kafka') \
        .foreachBatch(join_kafka_streams) \
        .option('checkpointLocation', checkpoint_location) \
        .start()
    query.awaitTermination()

def join_kafka_streams(kafka_df: DataFrame, batch_id: int):
    main_df = spark.sql('select * from table where some_filter_including_partitions')
    join_df = kafka_df.join(main_df, ['key_col1', 'key_col2', 'key_col3', 'key_col4'], 'inner')
    join_df.write.format('kafka') \
        .option('kafka.bootstrap.servers', kafkaconfig['kafka_broker']) \
        .option('kafka.batch.size', kafkaconfig['kafka_batch_size']) \
        .option('retries', kafkaconfig['retries']) \
        .option('kafka.max.request.size', kafkaconfig['kafka_max_request_size']) \
        .option('kafka.max.block.ms', kafkaconfig['kafka_max_block_ms']) \
        .option('kafka.metadata.max.age.ms', kafkaconfig['kafka_metadata_max_age_ms']) \
        .option('kafka.request.timeout.ms', kafkaconfig['kafka_request_timeout_ms']) \
        .option('kafka.linger.ms', kafkaconfig['kafka_linger_ms']) \
        .option('kafka.delivery.timeout.ms', kafkaconfig['kafka_delivery_timeout_ms']) \
        .option('acks', kafkaconfig['acks']) \
        .option('kafka.compression.type', kafkaconfig['kafka_compression_type']) \
        .option('kafka.security.protocol', kafkaconfig['kafka_security_protocol']) \
        .option('kafka.sasl.jaas.config', oauth_config) \
        .option('kafka.sasl.login.callback.handler.class', kafkaconfig['kafka_sasl_login_callback_handler_class']) \
        .option('kafka.sasl.mechanism', kafkaconfig['kafka_sasl_mechanism']) \
        .option('topic', topic_name) \
        .save()

kafka_df里面的数据是250万左右,main_df里面的数据是400万 当我开始作业时,连接结果包含 900k 条记录,加载 100k 条记录后,作业在运行 25 分钟后失败并出现以下异常。

py4j.protocol.Py4JJavaError: An error occurred while calling o500.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 66, 100.67.55.233, executor 0): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for x1-dev-asw32-edr-02a1-ba87-332c7da70fc1-topic_name:130000 ms has passed since batch creation
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:999)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:70)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

我正在我的数据块集群上提交作业。 上面的异常是由于会话超时还是因为内存问题? 谁能告诉我是什么导致了异常? 非常感谢任何帮助。

python apache-spark apache-kafka databricks spark-structured-streaming
1个回答
0
投票

在我的例子中,这些 Kafka 超时异常在写入之前将数据帧重新分区到更多分区后消失了。 由于输入 Kafka 主题中的分区数量较少(Spark 中的默认行为),数据帧的分区数量较少,因此这导致写入另一个 Kafka 主题时任务数量较少。当我将数据帧重新分区到 ~50 个分区时,似乎 Kafka 生产者正在使用更多线程并设法写入排队数据而不会使记录过期。

您还可以增加您正在写入的 Kafka 主题中的分区数。

其他调整包括将

kafka.request.timeout.ms
配置从默认的 30 秒增加到例如5 分钟并将
kafka.batch.size
从默认的 16 KB 增加到例如512 KB。当使用更大的批次时,请求更少,吞吐量增加,这有助于在记录过期之前发送记录。作为记录,这违背了从默认情况下减少批处理大小的常见建议(参见例如this answer),但小的 Kafka 批处理对我的情况没有帮助,我不明白他们怎么能(较小的批处理)意味着更多的请求和更低的吞吐量,所以也许单个批次发送得更快,但总体上会有更多的批次,所以请求仍然会过期,只是在较小的缓冲区中)。

© www.soinside.com 2019 - 2024. All rights reserved.