delta mergeSchema 无法将 MemoryStream 与 Spark 检查点结合使用

问题描述 投票:0回答:1

我正在使用 Spark 的

MemoryStream
测试 DeltaWriter 类来创建流(而不是 readStream),并且我想使用选项
"mergeSchema": true
将结果作为增量文件写入 s3 上,如下所示:

import org.apache.spark.sql.execution.streaming.MemoryStream
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import io.delta.implicits._

case class EnrichedPerson(id: Int, name: String, age: Int, address: String)
case class BasePerson(id: Int, name: String, age: Int)

val data1: List[BasePerson] = List(BasePerson(1, "mark ", 30), BasePerson(2, "paul", 25))
implicit val encoder1: Encoder[BasePerson] = Encoders.product[BasePerson]
val memStream1 = MemoryStream[BasePerson]
memStream1.addData(data1)
val stream1 = memStream1.toDS().toDF()
val streamQuery1 = stream1
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")
streamQuery1.processAllAvailable()

val data2: List[EnrichedPerson] = List(EnrichedPerson(11, "jhon", 31, "street 1"), EnrichedPerson(22, "luis", 32, "street 2"))
implicit val encoder2: Encoder[EnrichedPerson] = Encoders.product[EnrichedPerson]
val memStream2 = MemoryStream[EnrichedPerson]
val stream2 = memStream2.toDS().toDF()
memStream2.addData(data2)

val streamQuery2 = stream2
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")

streamQuery2.processAllAvailable()

第一次,代码在某些输入模式下运行良好(

streamQuery1
)(例如col(A)col(B),,col(C))。如果我尝试通过添加新列来更改架构(例如 col(A)col(B)col(C)col(D)),则相同的代码 (
 streamQuery2
)不会使用新列 C 更新增量表,即使我使用增量作为接收器并启用了mergeSchema。我没有收到任何错误,但是
streamQuery2
没有写入任何数据。

根据spark文档通过检查点从故障中恢复,我可以在两次执行之间更改架构,因为我的接收器(增量)允许架构更改

有条件地允许更改具有不同输出模式的投影:仅当输出接收器允许模式从“a”更改为“时,才允许将 sdf.selectExpr("a").writeStream 更改为 sdf.selectExpr("b").writeStream b”。

上述查询第一次执行时生成到 s3 的输出是:

deltaTable/
|
|__checkpoint/
|   |__commits/
|   |   |__0
|   |__offsets/
|   |   |__0
|   |__metadata
|
|__delta/
   |__....

检查检查点文件夹的内容,我发现没有包含有关我的数据架构的元数据的文件。确实,内容是:

cat deltaTable/checkpoint/metadata

{“id”:“b48487ca-5374-4b93-8e26-503184f2ed57”}

cat deltaTable/checkpoint/commits/0

v1 {“nextBatchWatermarkMs”:0}

cat deltaTable/checkpoint/offsets/0

v1 {“batchWatermarkMs”:0,“batchTimestampMs”:1649859656284,“conf”:{“spark.sql.streaming.stateStore.providerClass”:“org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,“spark .sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark .sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} 0

为了解决这个问题,正如这些link1link2中所报告的那样,删除检查点就足够了,但是这个解决方案的影响很大,因为删除检查点后,我如何从相同的偏移量开始?

任何人都可以向我解释为什么删除之前的查询工作中的检查点,即使检查点文件夹中没有有关模式的元数据,以及如何使用检查点和 mergeSchema 选项来实现模式演化测试?

提前致谢!

scala apache-spark spark-structured-streaming delta-lake spark-checkpoint
1个回答
0
投票

表的模式记录在增量日志中,而不是在检查点中。您需要检查表的

_delta_log
主管下的JSON文件(例如
/user/hive/warehouse/table_name
)。

© www.soinside.com 2019 - 2024. All rights reserved.