如何连接到 jdbc 作为 Databricks 中的流源

问题描述 投票:0回答:1

使用 https://github.com/sutugin/spark-streaming-jdbc-source 中的示例 我试图连接到 Postgres 数据库作为 AWS Databricks 中的流媒体源。

我有一个集群在运行: 11.3 LTS(包括 Apache Spark 3.3.0、Scala 2.12)

这个库安装在我的集群上: org.apache.spark:spark-streaming_2.12:3.3.2

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder
.appName("StructuredJDBC")
.getOrCreate()

import spark.implicits._

val jdbcOptions = Map(
"user" -> "myusername",
"password" -> "mypassword",
"database" -> "testDB",
"driver" -> "org.postgresql.Driver",
"url" -> "jdbc:postgresql://dbhostname:5432:mem:myDb;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"
)

// Create DataFrame representing the stream of input lines from jdbc
val stream = spark.readStream
.format("jdbc-streaming")
.options(jdbcOptions + ("dbtable" -> "dimensions_test_table") + ("offsetColumn" -> "loaded_timestamp"))
.load

// Start running the query that prints 'select result' to the console
val query = stream.writeStream
.outputMode("append")
.format("console")
.start()

query.awaitTermination()

但我被错误困扰: NoClassDefFoundError:org/apache/spark/sql/sources/v2/StreamWriteSupport 引起:ClassNotFoundException:org.apache.spark.sql.sources.v2.StreamWriteSupport

我能找到的关于此错误的唯一信息似乎不适用于我的情况。 我错过了什么?

我寻找过其他库,但这似乎是唯一一个支持 jdbc 作为 Scala 2.12 源的库。

scala jdbc databricks spark-streaming aws-databricks
1个回答
0
投票

这里有几个问题:

  • 你不需要在 Databricks 集群上安装

    org.apache.spark:spark-streaming_2.12:3.3.2
    库。 Databricks 运行时包括所有必需的 Spark 库,通过安装开源版本,您很可能会破坏 Databricks 特定的修改。

  • 要使用这个库,你需要自己编译并安装到集群上。但正如我所见,它已经 4 年没有更新了,默认情况下它是为 Spark 3.0(与 DBR 7.3 匹配)编译的。

如果您想从数据库中获取更改,您可以查看更改数据捕获功能,例如 CDC for RDS MySQL。然后数据可以登陆到 S3,例如,使用 Delta Live Tables 实现 CDC 模式

© www.soinside.com 2019 - 2024. All rights reserved.