我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar")
.config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "mystream")
.option("nats.stream.subjects", "mysubject")
.option("nats.durable.name", "myconsumer")
.option("nats.msg.ack.wait.secs", 120)
.load();
Dataset<Row> deDuplicatedDf = df.dropDuplicates("dateTime", "content");
StreamingQuery query;
try {
query = deDuplicatedDf.withColumn("date_only",
from_unixtime(unix_timestamp(col("dateTime"), "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
.writeStream()
.outputMode("append")
.partitionBy("date_only")
.format("delta")
//.option("checkpointLocation", "tmp/checkpoint")
.start("tmp/outputdelta");
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
当 Spark 应用程序正在处理消息时,如果我们在中间停止 Spark 应用程序并在一段时间后重新启动 Spark 应用程序,则在应用程序停止之前的最后一个微批次中处理的消息将被重新处理。我想避免处理重复项,因此尝试添加
Dataset<Row> deDuplicatedDf = df.dropDuplicates("dateTime", "content");
按照streaming-deduplication中的建议添加dropDuplicates后,应用程序变得非常非常慢,而且也不会删除重复项。 请注意,重新启动 Spark 应用程序后,由于 NATS 重新发送未确认的消息,重复消息将到达应用程序。我什至尝试将水印与 dropDuplicates 一起使用。但它仍然不起作用。 如何解决这个问题?有没有更好的方法来避免以 Delta Lake 作为接收器的 Spark 结构化流中的重复?
重新启动后看到重复项的可能原因是您注释掉了运行之间保存状态的检查点位置。而且,速度慢的可能原因是您正在使用默认状态提供程序,该提供程序将状态保留在 JVM 中并导致垃圾收集运行更频繁。
我建议你尝试使用RocksDB来保存状态,看看它是否会提高你的性能和体验:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")