我有一个连续读取 Kafka 的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。在写入 Cassandra 时,它可能会抛出任何类型的异常(ConnectionTimeOut 等)。我该怎么做才能确保数据没有丢失,我该怎么做才能对该特定批次的数据执行重试。
这是我的 writeStream 函数,它在内部调用我们执行写入表的保存方法。
query = df.writeStream \
.outputMode("append") \
.option("checkpointLocation", "path") \
.option("failOnDataLoss", "false") \
.option("maxAttempts", "5") \
.option("retryOnDataLoss", "true") \
.option("failedWriteFile", "path") \
.foreachBatch(save) \
.start()
这是保存方法。
`def save(df, batch_id):
try:
(df.write
.format("org.apache.spark.sql.cassandra")
.options(table=tableName, keyspace=keyspaceName)
.mode("append")
.save())
return None
except Exception as e:
raise e`
据我所知,当 save 方法抛出异常时,spark 函数会再次重试该批处理,直到重试次数耗尽。即使它仍然失败,它也会写入指定路径并继续处理下一批。
那些选项
maxAttempts
, retryOnDataLoss
, failedWriteFile
, 仍然有效吗?我没有在官方 sparkDocs 或 spark-cassandra-connector 库中找到任何参考资料。
或者还有其他选择。
https://github.com/datastax/spark-cassandra-connector
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
配置:
.option("retryOnDataLoss", "true")
.option("failedWriteFile", "path")
可以毫无问题地移除,spark已经处理了异常发生时失败任务的重试