Apache Spark 3.5 批处理模式结构化流中的 Kafka 偏移量问题

问题描述 投票:0回答:1

我正在根据 Kafka 集成指南编写一个使用 Kafka 作为源的批量查询,并希望定期提交该批次(例如每天一次),以处理自上次运行以来添加的记录。在运行测试期间

pyspark
我注意到每次批处理运行时,它都会读取所有记录,而不仅仅是自上次运行以来添加的记录。我的代码大致如下。

问题是:我必须更改什么,以便每次运行时,我只处理新的 Kafka 记录?

builder = (pyspark.sql.SparkSession.builder.appName("MyApp")
            .master("local[*]")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")
            .config("spark.hadoop.fs.s3a.access.key", s3a_access_key)
            .config("spark.hadoop.fs.s3a.secret.key", s3a_secret_key)
            .config("spark.hadoop.fs.s3a.endpoint", s3a_host_port)
            .config("spark.hadoop.fs.s3a.path.style.access", "true")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/data/custom-log4j.properties")
           )

my_packages = [
                   # "io.delta:delta-spark_2.12:3.0.0", -> no need, since configure_spark_with_delta_pip below adds it
                   "org.apache.hadoop:hadoop-aws:3.3.4",
                   "org.apache.hadoop:hadoop-client-runtime:3.3.4",
                   "org.apache.hadoop:hadoop-client-api:3.3.4",
                   "io.delta:delta-contribs_2.12:3.0.0",
                   "io.delta:delta-hive_2.12:3.0.0",
                   "com.amazonaws:aws-java-sdk-bundle:1.12.603",
                   "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
               ]

# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()


kdf = (spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
        .option("kafka.security.protocol", kafka_security_protocol)
        .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
        .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{kafka_username}\" password=\"{kafka_password}\";")
        .option("includeHeaders", "true")
        .option("subscribe", "filebeat")
        .option("checkpointLocation", "s3a://checkpointlocation/")
        .load())

kdf = kdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)")

out = kdf...

(out.select(["message", "partition", "offset"])
                     .show(
                        truncate=False,
                        n=MAX_JAVA_INT
                    ))


spark.stop()

这会输出一个表格,我可以在其中看到每次运行都处理相同的偏移量。

apache-spark pyspark spark-structured-streaming
1个回答
0
投票

您正在以批处理模式链接阅读该主题,默认情况下设置

startingOffsets = earliest
。另外,
checkpointLocation
在批处理模式下没有效果,您必须在流模式下读取
spark.readStream...
,处理后的偏移量将存储在那里。

© www.soinside.com 2019 - 2024. All rights reserved.