如何使用Azure Cosmos DB连接器从流查询中写入CosmosDB?

问题描述 投票:0回答:1

我有一个简单的结构化流应用程序,输出接收器应为CosmosDB。当我调用writeStream方法时,弹出以下错误。添加到群集的库的版本为:

com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.1, type:Maven

我的代码如下:

val outstream = staticInputDF 
  .writeStream
  .format(classOf[CosmosDBSinkProvider].getName)
  .options(config)
  .start
  .awaitTermination

导致错误:

command-751666472135258:74:错误:方法值选项重载以及其他选择:(选项:java.util.Map [String,String])org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row](选项:scala.collection.Map [String,String])org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row]不适用于(com.microsoft.azure.cosmosdb.spark.config.Config)

如何从流数据帧写入一个CosmosDB集合?

scala apache-spark azure-cosmosdb spark-structured-streaming azure-databricks
1个回答
0
投票

以下代码显示了如何将数据帧写入Cosmos DB。

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" -> "true",
  "WritingBatchSize" -> "500",
  "CheckpointLocation" -> "/checkpointlocation_write1"
))

// Write to Cosmos DB from the flights DataFrame
df
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.options(writeConfig)
.start()

参考: Azure Databricks Spark Connecter

希望这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.