我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar")
.config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "mystream")
.option("nats.stream.subjects", "mysubject")
.option("nats.durable.name", "myconsumer")
.option("nats.msg.ack.wait.secs", 120)
.load();
Dataset<Row> deDuplicatedDf = df.dropDuplicates("dateTime", "content");
StreamingQuery query;
try {
query = deDuplicatedDf.withColumn("date_only",
from_unixtime(unix_timestamp(col("dateTime"), "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
.writeStream()
.outputMode("append")
.partitionBy("date_only")
.format("delta")
//.option("checkpointLocation", "tmp/checkpoint")
.start("tmp/outputdelta");
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
当 Spark 应用程序正在处理消息时,如果我们在中间停止 Spark 应用程序并在一段时间后重新启动 Spark 应用程序,则在应用程序停止之前的最后一个微批次中处理的消息将被重新处理。我想避免处理重复项,因此尝试添加
Dataset<Row> deDuplicatedDf = df.dropDuplicates("dateTime", "content");
按照streaming-deduplication中的建议添加dropDuplicates后,应用程序变得非常非常慢,而且也不会删除重复项。 请注意,重新启动 Spark 应用程序后,由于 NATS 重新发送未确认的消息,重复消息将到达应用程序。我什至尝试将水印与 dropDuplicates 一起使用。但它仍然不起作用。 如何解决这个问题?有没有更好的方法来避免以 Delta Lake 作为接收器的 Spark 结构化流中的重复?
更新: 问题更详细
从 NATS CLI 中,我运行以下命令来推送 10k 消息进行测试
nats pub newsub --count=10000 "test #{{Count}}"
当 Spark 应用程序正在处理消息时,停止 Spark 应用程序并计算输出文件夹中已处理消息的数量 = 3100 条消息。 正如我在 tmp\outputdelta_delta_log 文件夹中看到的,停止应用程序之前生成的最后一个文件包含以下内容:
{"commitInfo":{"timestamp":1701081648739,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"193fec24-78ea-4ac0-87af-7b6232e748ff","epochId":"20"},"readVersion":19,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"100","numOutputBytes":"1753","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"36a435e7-f775-4f80-a637-48a40c23dc87"}}
{"txn":{"appId":"193fec24-78ea-4ac0-87af-7b6232e748ff","version":20,"lastUpdated":1701081648739}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-5c9955af-06c7-4da6-991a-2412f033cd38.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":1753,"modificationTime":1701081648739,"dataChange":true,"stats":"{\**"numRecords\":100**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3001\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3100\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}
如您所见,最后一个微批次有 numRecords = 100 ,并包含从 test #3001 到 test #3100
的消息现在我正在重新启动 Spark 应用程序。当它完成处理来自 NATS 流的待处理消息(10000-3100 = 6900 条消息)后,如果我检查输出文件夹中已处理消息的数量 = 10100 条消息! 它应该正好是 10000 条。100 条消息被重复。
如果我检查 tmp\outputdelta_delta_log 文件夹中重新启动 Spark 应用程序后生成的第一个文件包含以下内容:
{"commitInfo":{"timestamp":1701081772472,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","epochId":"10"},"readVersion":30,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"1000","numOutputBytes":"6250","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"d1a5e794-6827-4c3d-bfa0-e1fd9e3474c9"}}
{"txn":{"appId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","version":10,"lastUpdated":1701081772472}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-8e45d89b-6426-43f5-8144-ee826874dcfb.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":6250,"modificationTime":1701081772455,"dataChange":true,"stats":"{\"**numRecords\":1000**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3001\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #4000\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}
正如您在此微批次中所看到的 numRecords = 1000 ,并包含从 test #3001 到 test #4000 的消息。即使处理了 #3001 到 #3100 消息,它仍然从 test #3001 开始处理通过应用程序关闭之前的上一个微批次。原因可能是在这 100 条消息被处理并保存在 Delta Lake 之后,在确认发送回 NATS 之前,应用程序已关闭。因此,一旦应用程序重新启动,NATS 就会在新的微批次中再次重新发送这 100 条消息。 我如何避免在 Delta Lake 中重新处理和保存这些重新发送的消息以避免重复。
更新2
df.dropDuplicates("dateTime", "content")
如果我不停止并重新启动 Spark 应用程序,则工作正常!我尝试推送 100 条具有相同内容“测试”的消息,这些消息通过使用
nats pub mysubject --count=100 "test"
同时触发。它只允许 1 条消息,我可以看到只有 1 条消息保存在输出 Delta Lake 中!所以看起来 dropDuplicates 在我重新启动 Spark 应用程序后不起作用。一旦我们停止 Spark 应用程序,它是否会丢失保存的数据/状态以查找重复项?如果是的话,我们如何确保 dropDuplicates 即使在我们停止并重新启动应用程序后也能正常工作?
重新启动后看到重复项的可能原因是您注释掉了运行之间保存状态的检查点位置。而且,速度慢的可能原因是您正在使用默认状态提供程序,该提供程序将状态保留在 JVM 中并导致垃圾收集运行更频繁。
我建议你尝试使用RocksDB来保存状态,看看它是否会提高你的性能和体验:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
df = df.withWatermark("dateTime", "1 hour")
df = df.dropDuplicates("dateTime", "content")
我建议使用内容的哈希来进行重复数据删除,否则会使Spark存储太多数据
“请注意,重新启动 Spark 应用程序后,由于 NATS 重新发送未确认的消息,重复的消息将到达应用程序。” - 这不是一件好事吗?
如果您没有提交微批次,则不会将任何内容保存到目标,因此您应该重新处理微批次,否则将会丢失数据。