当某些偏移量没有任何数据时从kafka读取时出现空指针异常

问题描述 投票:0回答:0

我正在阅读 Spark Streaming 中的 kafka 主题,在我们开始在 kafka 中获得空抵消之前,工作一直很好。

[ERROR] org.apache.spark.executor.Executor - Exception in task 0.0 in stage 1.0 (TID 12) java.lang.NullPointerException     at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:113)  at org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow(KafkaRecordToUnsafeRowConverter.scala:39)  at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:339)  at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 

我尝试对值应用过滤器,但没有任何效果 错误:

  val kafkaDataFrameRaw = spark
    .readStream

    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaUrl)
    .option("subscribe", topics)
    .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
    .option("startingOffsets", startOffsetFinal)
    .option("failOnDataLoss", false)

    .load()


kafkaDataFrameRaw .
    foreachBatch { (result: DataFrame, batchId: Long) =>
     val resultFiltered= result.filter(col("value").isNull)
      resultFiltered.show(10,false) }.start()

kafka 控制台消费者:

kafka-avro-console-consumer --bootstrap-server kafkabrokernode01:9091,kafkabrokernode01:9091,kafkabrokernode03:9091 --property print.key=true --partition 0 --offset 923390 --max-messages=10 --topic aosdev.tenant_unifiedplanner.plan.digital.lines --property schema.registry.url=http://schemaregurl01:8081

{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d3\"}"}     {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":31,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535116}}
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d3\"}"}     null
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d4\"}"}     {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":32,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535117}}
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d4\"}"}     null
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5da\"}"}     {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":33,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535117}}
scala apache-spark apache-kafka spark-structured-streaming
© www.soinside.com 2019 - 2024. All rights reserved.