根据官方文档,我使用下面的代码段写入kafka主题,但未写入kafka。
finalStream = final \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers",bootstrap_servers) \
.option("topic",topic_name) \
.option("checkpointLocation", check_point_location) \
.start()
finalStream.awaitTermination()
但是通过使用awaitAnyTermination()
而不是awaitTermination()
,可以写入kafka。
spark.streams.awaitAnyTermination()
请提出原因。
awaitTermination
是阻止呼叫。这意味着仅在查询终止时才执行写操作。
还请注意,如果您有多个流/查询,那么如果希望同时执行它们,则应始终使用awaitAnyTermination()
。在这种情况下,仅当先前的流/查询已完成时,awaitTermination()
才允许执行后续的流/查询。