我使用python和apache beam从kafka读取流数据并将数据插入到大查询表中,但我想批量插入数据而不是流方式。
我尝试将管道流模式设置为 True 并将批量大小添加到
WriteToBigQuery
方法,但数据以流模式插入到 bq 表中。另外,我尝试将管道流模式设置为 False,但在 Kafka 主题中需要读取的数据太多,管道被卡住了。有什么办法可以做到这一点吗?
我找到了解决问题的方法。 我决定将
WriteToBigQuery
的加载方法从 method=STREAMING_INSERT
更改为 method=LOAD_FILES
。
我的数据流工作陷入了从 Kafka 消费者向 Bigquery 写入数据的困境。我看到数据是从消费者以正确的格式消费的。
您能否帮助我了解如何实现以流模式从 Kafak 消费者向 Bigquery 写入数据