我正在阅读 Spark Streaming 中的 kafka 主题,在我们开始在 kafka 中获得空抵消之前,工作一直很好。
[ERROR] org.apache.spark.executor.Executor - Exception in task 0.0 in stage 1.0 (TID 12) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:113) at org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow(KafkaRecordToUnsafeRowConverter.scala:39) at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:339) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
我尝试对值应用过滤器,但没有任何效果 错误:
val kafkaDataFrameRaw = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("subscribe", topics)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.option("startingOffsets", startOffsetFinal)
.option("failOnDataLoss", false)
.load()
kafkaDataFrameRaw .
foreachBatch { (result: DataFrame, batchId: Long) =>
val resultFiltered= result.filter(col("value").isNull)
resultFiltered.show(10,false) }.start()
kafka 控制台消费者:
kafka-avro-console-consumer --bootstrap-server kafkabrokernode01:9091,kafkabrokernode01:9091,kafkabrokernode03:9091 --property print.key=true --partition 0 --offset 923390 --max-messages=10 --topic aosdev.tenant_unifiedplanner.plan.digital.lines --property schema.registry.url=http://schemaregurl01:8081
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d3\"}"} {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":31,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535116}}
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d3\"}"} null
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d4\"}"} {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":32,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535117}}
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5d4\"}"} null
{"id":"{ \"$oid\" : \"6411e90bed84757225eba5da\"}"} {"after":null,"patch":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mongodb"},"name":"aosdev","rs":"atlas-rnrbpj-shard-0","ns":"tenant_unifiedplanner.plan.digital.lines","sec":1678895535,"ord":33,"h":null,"initsync":{"boolean":false}},"op":{"string":"d"},"ts_ms":{"long":1678895535117}}