根据官方文档,我使用下面的代码段写入kafka主题,但它没有写入kafka。
finalStream = final \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers",bootstrap_servers) \
.option("topic",topic_name) \
.option("checkpointLocation", check_point_location) \
.start()
finalStream.awaitTermination()
但是通过使用 awaitAnyTermination()
而不是 awaitTermination()
,写成卡夫卡作品。
spark.streams.awaitAnyTermination()
请提出背后的原因。