Spark结构化流未处理Kafka偏移量过期

问题描述 投票:0回答:1

[我们有Spark结构化的流应用程序,可将数据从Kafka推送到S3。Spark Job可以正常运行几天,然后开始累积滞后。我们的Kafka主题有效期为6个小时。如果延迟增加并且某些偏移量开始过期,则Spark无法找到该偏移量,并在出现警告后开始记录。从表面上看,Spark作业似乎正在运行,但未处理任何数据。当我尝试手动重新启动系统时,我遇到了GC问题(在下面的屏幕截图中看到)。我已经将“ failOnDataLoss”设置为“ false”。我们希望系统在找不到偏移量时不停止处理。除了下面提到的警告外,我在日志中没有看到任何错误。

enter image description here

仅警告我们看到的是这个:

20/05/17 17:16:30 WARN InternalKafkaConsumer:当前可用的偏移范围是AvailableOffsetRange(34066048,34444327)。偏移量34005119超出范围,[34005119,34006993)中的记录将变为已跳过(GroupId:spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor,TopicPartition:DataPipelineCopy-46)。某些数据可能已经丢失,因为它们在Kafka中不再可用。要么数据已被Kafka淘汰,或者该主题可能已被删除,主题已处理。如果您希望流查询在这种情况下失败,请设置源选项“ failOnDataLoss”设置为“ true”。

20/05/17 17:16:30信息提取器:[Consumer clientId = consumer-7,groupId = spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor]重置分区的偏移量DataPipelineCopy-1到偏移量34444906。17年5月20日17:16:30警告InternalKafkaConsumer:某些数据可能会丢失。从最早的胶印中恢复:3406878217年5月20日17:16:30警告InternalKafka消费者:当前可用的偏移范围是AvailableOffsetRange(34068782,34444906)。偏移量34005698超出范围,[34005698,34007572)中的记录将已跳过(GroupId:spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor,TopicPartition:DataPipelineCopy-1)。某些数据可能已经丢失,因为它们在Kafka中不再可用。要么数据已被Kafka淘汰,或者该主题可能已被删除,主题已处理。如果您希望流查询在这种情况下失败,请设置源选项“ failOnDataLoss”设置为“ true”。

ome data may have been lost because they are not available in Kafka any more; either the

数据已被Kafka淘汰,或者该主题可能已删除,主题已处理。如果您希望流查询在这种情况下失败,请设置源选项“ failOnDataLoss”设置为“ true”。

org.apache.kafka.clients.consumer.OffsetOutOfRangeException:偏移量超出范围,没有为分区配置任何重置策略:{DataPipelineCopy-1 = 34005698}在org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)在org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)在org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)在org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)在org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)在org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)在org.apache.spark.sql.kafka010.InternalKafkaConsumer.org $ apache $ spark $ sql $ kafka010 $ InternalKafkaConsumer $$ fetchRecord(KafkaDataConsumer.scala:361)在org.apache.spark.sql.kafka010.InternalKafkaConsumer $$ anonfun $ get $ 1.apply(KafkaDataConsumer.scala:251)在org.apache.spark.sql.kafka010.InternalKafkaConsumer $$ anonfun $ get $ 1.apply(KafkaDataConsumer.scala:234)在org.apache.spark.util.UninterruptibleThread.runUninterruptible(UninterruptibleThread.scala:77)在org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptibleIfPossible(KafkaDataConsumer.scala:209)在org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)在org.apache.spark.sql.kafka010.KafkaDataConsumer $ class.get(KafkaDataConsumer.scala:64)在org.apache.spark.sql.kafka010.KafkaDataConsumer $ CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:500)在org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)在org.apache.spark.sql.execution.datasources.v2.DataSourceRDD $$ anon $ 1.hasNext(DataSourceRDD.scala:49)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 13 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:636)在scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:409)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage2.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 13 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:636)在org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)在org.apache.spark.sql.execution.SortExec $$ anonfun $ 1.apply(SortExec.scala:108)在org.apache.spark.sql.execution.SortExec $$ anonfun $ 1.apply(SortExec.scala:101)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 24.apply(RDD.scala:836)在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 24.apply(RDD.scala:836)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)在org.apache.spark.rdd.RDD.iterator(RDD.scala:288)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)在org.apache.spark.scheduler.Task.run(Task.scala:123)在org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply(Executor.scala:408)在org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1360)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:414)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)20/05/17 17:16:30 WARN ConsumerConfig:提供了配置'consumer.commit.groupid',但不是已知配置。17年5月20日17:16:30 INFO AppInfoParser:Kafka版本:2.0.0

在上述故障系统似乎运行正常之前,但未处理来自KAFKA的任何新数据。

enter image description here

apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

“当前可用的偏移范围是AvailableOffsetRange(34066048,34444327)。偏移34005119超出范围”

似乎您的程序在处理记录时比预期的要慢得多。由于保留,导致kafka中的日志被删除。您可以检查设置的保留时间吗?您的工作正在寻找的偏移量肯定小于可用的偏移量范围。

© www.soinside.com 2019 - 2024. All rights reserved.