底层方法抛出异常时的SparkStream

问题描述 投票:0回答:1

我有一个连续读取 Kafka 的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。在写入 Cassandra 时,它可能会抛出任何类型的异常(ConnectionTimeOut 等)。我该怎么做才能确保数据没有丢失,我该怎么做才能对该特定批次的数据执行重试。

这是我的 writeStream 函数,它在内部调用我们执行写入表的保存方法。

query = df.writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "path") \
    .option("failOnDataLoss", "false") \  
    .option("maxAttempts", "5") \ 
    .option("retryOnDataLoss", "true") \ 
    .option("failedWriteFile", "path") \
    .foreachBatch(save) \
    .start()

这是保存方法。

`def save(df, batch_id):
    try:
        (df.write
         .format("org.apache.spark.sql.cassandra")
         .options(table=tableName, keyspace=keyspaceName)
         .mode("append")
         .save())
        return None
    except Exception as e:
        raise e`

据我所知,当 save 方法抛出异常时,spark 函数会再次重试该批处理,直到重试次数耗尽。即使它仍然失败,它也会写入指定路径并继续处理下一批。

那些选项

maxAttempts
,
retryOnDataLoss
,
failedWriteFile
, 仍然有效吗?我没有在官方 sparkDocs 或 spark-cassandra-connector 库中找到任何参考资料。 或者还有其他选择。

https://github.com/datastax/spark-cassandra-connector

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

apache-spark cassandra databricks spark-streaming spark-structured-streaming
1个回答
0
投票

配置:

   .option("retryOnDataLoss", "true")
   .option("failedWriteFile", "path") 

可以毫无问题地移除,spark已经处理了异常发生时失败任务的重试

© www.soinside.com 2019 - 2024. All rights reserved.