来自 Kafka 主题的 Spark 消费者流不断重置偏移量

问题描述 投票:0回答:1

所以我正在运行一个 spark 流程序,并使用以下 readStream 方法选项从 Kafka 主题读取流。

options = {
        ...
        ...
        "kafka.security.protocol": "SASL_SSL",
        "startingOffsets": "earliest",
        "maxOffsetsPerTrigger": records_per_trigger,
        "subscribe": topic
        "auto.offset.reset": "earliest"
    }

我将“startingOffsets”和“auto.offset.reset”设置为最早,但根据日志,它一直将偏移量重置为 142634681。但是,根据我得到的输出,它显然仍在使用来自 Kafka 主题的消息那么日志不准确吗?还是我理解偏移量是如何工作的?

将分区 edgebook_db_ca_on_history.journal_entries-0 的偏移量重置为偏移量 142634681.

23/02/28 20:06:52 INFO org.apache.kafka.clients.Metadata: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Cluster ID: lkc-09j5mp
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Discovered group coordinator b14-pkc-3w22w.us-central1.gcp.confluent.cloud:9092 (id: 2147483633 rack: null)
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] (Re-)joining group
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] (Re-)joining group
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Finished assignment for group at generation 1: {consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1-1f851ea3-4e44-4a9e-b0e3-a5a75d9cbf3d=Assignment(partitions=[edgebook_db_ca_on_history.journal_entries-0])}
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Successfully joined group with generation 1
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Notifying assignor about the new Assignment(partitions=[edgebook_db_ca_on_history.journal_entries-0])
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Adding newly assigned partitions: edgebook_db_ca_on_history.journal_entries-0
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Found no committed offset for partition edgebook_db_ca_on_history.journal_entries-0
23/02/28 20:06:52 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Seeking to EARLIEST offset of partition edgebook_db_ca_on_history.journal_entries-0
23/02/28 20:06:53 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Resetting offset for partition edgebook_db_ca_on_history.journal_entries-0 to offset 0.
23/02/28 20:06:54 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Seeking to LATEST offset of partition edgebook_db_ca_on_history.journal_entries-0
23/02/28 20:06:54 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0-1, groupId=spark-kafka-source-379c9446-a679-47c1-bad8-fbf11b3db3e0--1283692680-driver-0] Resetting offset for partition edgebook_db_ca_on_history.journal_entries-0 to offset 142634681.
apache-spark pyspark apache-kafka apache-kafka-streams kafka-topic
1个回答
0
投票

就是spark重新设置偏移量到最新分区的方式,你可以看看这段代码,它会帮助你理解spark读取偏移量的方式。 Kafka 偏移阅读器 在火花流场景中

但是代码总是将偏移量设置为那个特定的数字吗?您可以将一条消息推送到 kafka 主题并检查日志并查看偏移量是否已移动到最新的吗?

此外,请查看检查点位置并转到偏移文件夹,查看偏移是否以正确的方式进行。

© www.soinside.com 2019 - 2024. All rights reserved.