我正在根据 Kafka 集成指南编写一个使用 Kafka 作为源的批量查询,并希望定期提交该批次(例如每天一次),以处理自上次运行以来添加的记录。在运行测试期间
pyspark
我注意到每次批处理运行时,它都会读取所有记录,而不仅仅是自上次运行以来添加的记录。我的代码大致如下。
问题是:我必须更改什么,以便每次运行时,我只处理新的 Kafka 记录?
builder = (pyspark.sql.SparkSession.builder.appName("MyApp")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.hadoop.fs.s3a.access.key", s3a_access_key)
.config("spark.hadoop.fs.s3a.secret.key", s3a_secret_key)
.config("spark.hadoop.fs.s3a.endpoint", s3a_host_port)
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/data/custom-log4j.properties")
)
my_packages = [
# "io.delta:delta-spark_2.12:3.0.0", -> no need, since configure_spark_with_delta_pip below adds it
"org.apache.hadoop:hadoop-aws:3.3.4",
"org.apache.hadoop:hadoop-client-runtime:3.3.4",
"org.apache.hadoop:hadoop-client-api:3.3.4",
"io.delta:delta-contribs_2.12:3.0.0",
"io.delta:delta-hive_2.12:3.0.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.603",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
]
# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
kdf = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("kafka.security.protocol", kafka_security_protocol)
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{kafka_username}\" password=\"{kafka_password}\";")
.option("includeHeaders", "true")
.option("subscribe", "filebeat")
.option("checkpointLocation", "s3a://checkpointlocation/")
.load())
kdf = kdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
out = kdf...
(out.select(["message", "partition", "offset"])
.show(
truncate=False,
n=MAX_JAVA_INT
))
spark.stop()
这会输出一个表格,我可以在其中看到每次运行都处理相同的偏移量。
您正在以批处理模式链接阅读该主题,默认情况下设置
startingOffsets = earliest
。另外,checkpointLocation
在批处理模式下没有效果,您必须在流模式下读取spark.readStream...
,处理后的偏移量将存储在那里。