Spark结构化流使用spark-acid writeStream(带检查点)抛出org.apache.hadoop.fs.FileAlreadyExistsException。

问题描述 投票:0回答:1

在我们的Spark应用中,我们使用 Spark structured streaming. 它采用 Kafka as input stream,&amp。HiveAcid as writeStream 到Hive表.对于 HiveAcid它是一个开源库,名为 spark acidqubole: https:/github.comqubolespark-acid。

以下是我们的代码。

import za.co.absa.abris.avro.functions.from_confluent_avro
....

val spark = SparkSession
  .builder()
  .appName("events")
  .config("spark.sql.streaming.metricsEnabled", true)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

val input_stream_df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
  .option("maxOffsetsPerTrigger", 10000)
  .option("subscribe", "events")
  .load()

// schema registry config
val srConfig = Map(
  "schema.registry.url"           -> "http://schema-registry:8081",
  "value.schema.naming.strategy"  -> "topic.name",
  "schema.registry.topic"         -> "events",
  "value.schema.id"               -> "latest"
)

val data = input_stream_df
  .withColumn("value", from_confluent_avro(col("value"), srConfig))
  .withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
  .select(
    $"value.*",
    year($"timestamp_s")       as 'year,
    month($"timestamp_s")      as 'month,
    dayofmonth($"timestamp_s") as 'day
  )

// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
  .queryName("hiveSink")
  .option("database", "default")
  .option("table", "events_sink")
  .option("checkpointLocation", "/user/spark/events/checkpoint")
  .option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
  .option("metastoreUri", "thrift://hive-metastore:9083")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

output_stream_df.awaitTermination()

我们能够将应用程序部署到生产中,并重新部署了几次(大约10次),没有问题。然后就遇到了以下错误。

查询hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] 异常终止。任务因阶段性失败而中止。0.0阶段的任务3失败了4次,最近一次失败。0.0阶段的任务3.3丢失(TID42,10.236.7.228,执行者3):org.apache.hadoop.fs.FileAlreadyExistsException: warehousetablespacemanagedhiveeventsyear=2020month=5day=18delta_0020079_0020079bucket_00003 for client 10. 236.7.228已经存在(...)在com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95)在com.qubole.shaded.orc. impl.WriterImpl.(WriterImpl.java:177) at com.qubole.shaded.hadoop.ql.io.orc.WriterImpl.(WriterImpl.java:94) at com.qubole.shaded.hadoop.ql.io.orc.OrcFile.createWriter(OrcFile. java:334) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater. addSimpleEvent(OrcRecordUpdater.java:423) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432) at com.qubole.shaded.hadoop. hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484) at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295) at com.qubole. 在com.qubole.spark.hiveacid.writer.TableWriter$anon$1$anonfun$6.apply(TableWriter.scala:153) (......)处。 ...)在com.qubole.spark.hiveacid.writer.TableWriter$anon$1.apply(TableWriter.scala:153) 在com.qubole.spark.hiveacid.writer.TableWriter$anon$1.apply(TableWriter.scala:139)

每次重启应用程序,它都会显示不同的。delta + bucket files 已经存在的错误。然而,这些文件是每次启动时新创建的(很可能),但不知道为什么会抛出错误。

任何指针将非常感激。

apache-spark spark-structured-streaming qubole spark-hive spark-checkpoint
1个回答
0
投票

我从worker的错误日志中发现了实际的根本原因。这是由于我在其中一个使用的库中做了代码修改,导致了 out of memory 的问题。

我之前发的是驱动的错误日志,在worker节点上出现了几次故障后。

© www.soinside.com 2019 - 2024. All rights reserved.