从kafka主题读取数据引发错误

问题描述 投票:0回答:1

我正在尝试从 kafka 主题读取数据到 Spark。我正在 docker 桌面上运行 kafka。 这是我的 pom-

<groupId>com.dell</groupId>
<artifactId>sparkdemo2</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.13</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-bundle</artifactId>
        <version>1.11.1026</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-dell</artifactId>
        <version>1.4.3</version>
    </dependency>
    <!--        <dependency>-->
    <!--            <groupId>org.apache.thrift</groupId>-->
    <!--            <artifactId>libthrift</artifactId>-->
    <!--            <version>1.4.3</version>-->
    <!--        </dependency>-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.13</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>com.emc.ecs</groupId>
        <artifactId>object-client-bundle</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark-runtime-3.5_2.12</artifactId>
        <version>1.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.13</artifactId>
        <version>3.3.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>3.0.0-M7</version>
            <configuration>
                <argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
            </configuration>
        </plugin>
    </plugins>
</build>

这是我的代码

    package com.dell;

   import org.apache.spark.sql.Column;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.SparkSession;

   public class Main {
public static void main(String[] args)
{
    // Define Kafka parameters

    SparkSession spark = SparkSession.builder().appName("Kafka Spark Demo").master("local[*]").getOrCreate();
    String kafkaBrokers= "localhost:9092";
    String kafkaTopic = "Topic_Project";
    Dataset<Row> kafkaStreamDf = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaBrokers)
            .option("subscribe", kafkaTopic)
            .option("startingOffsets", "earliest")
            .load();
    kafkaStreamDf.show(5);

}

}

我不断收到此错误-

找不到数据源:kafka。请将应用程序部署为 根据“结构化流+ Kafka 的部署部分” 集成指南”。位于 org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError

我似乎无法弄清楚这里的问题。任何想法都有帮助。

java apache-spark apache-kafka
1个回答
0
投票

根据文档,您应该将以下依赖项的版本从

spark-sql-kafka-0-10_2.13
更改为
spark-sql-kafka-0-10_2.12

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.13</artifactId>
    <version>3.3.2</version>
    <scope>test</scope>
</dependency>

应该有以下版本。

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.3.2

这应该可以解决错误。

© www.soinside.com 2019 - 2024. All rights reserved.