我正在尝试从 kafka 主题读取数据到 Spark。我正在 docker 桌面上运行 kafka。 这是我的 pom-
<groupId>com.dell</groupId>
<artifactId>sparkdemo2</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.11.1026</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-dell</artifactId>
<version>1.4.3</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.thrift</groupId>-->
<!-- <artifactId>libthrift</artifactId>-->
<!-- <version>1.4.3</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.emc.ecs</groupId>
<artifactId>object-client-bundle</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.5_2.12</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.13</artifactId>
<version>3.3.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
<configuration>
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>
这是我的代码
package com.dell;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args)
{
// Define Kafka parameters
SparkSession spark = SparkSession.builder().appName("Kafka Spark Demo").master("local[*]").getOrCreate();
String kafkaBrokers= "localhost:9092";
String kafkaTopic = "Topic_Project";
Dataset<Row> kafkaStreamDf = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load();
kafkaStreamDf.show(5);
}
}
我不断收到此错误-
找不到数据源:kafka。请将应用程序部署为 根据“结构化流+ Kafka 的部署部分” 集成指南”。位于 org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError
我似乎无法弄清楚这里的问题。任何想法都有帮助。
根据文档,您应该将以下依赖项的版本从
spark-sql-kafka-0-10_2.13
更改为spark-sql-kafka-0-10_2.12
:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.13</artifactId>
<version>3.3.2</version>
<scope>test</scope>
</dependency>
应该有以下版本。
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.3.2
这应该可以解决错误。