我正在使用 Spark 的
MemoryStream
测试 DeltaWriter 类来创建流(而不是 readStream),并且我想使用选项 "mergeSchema": true
将结果作为增量文件写入 s3 上,如下所示:
import org.apache.spark.sql.execution.streaming.MemoryStream
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import io.delta.implicits._
case class EnrichedPerson(id: Int, name: String, age: Int, address: String)
case class BasePerson(id: Int, name: String, age: Int)
val data1: List[BasePerson] = List(BasePerson(1, "mark ", 30), BasePerson(2, "paul", 25))
implicit val encoder1: Encoder[BasePerson] = Encoders.product[BasePerson]
val memStream1 = MemoryStream[BasePerson]
memStream1.addData(data1)
val stream1 = memStream1.toDS().toDF()
val streamQuery1 = stream1
.writeStream
.format("delta")
.option("mergeSchema", "true")
.outputMode("append")
.option("checkpointLocation", "my/checkpointpoint/location1")
.delta("s3://bucket/raw/data/..-")
streamQuery1.processAllAvailable()
val data2: List[EnrichedPerson] = List(EnrichedPerson(11, "jhon", 31, "street 1"), EnrichedPerson(22, "luis", 32, "street 2"))
implicit val encoder2: Encoder[EnrichedPerson] = Encoders.product[EnrichedPerson]
val memStream2 = MemoryStream[EnrichedPerson]
val stream2 = memStream2.toDS().toDF()
memStream2.addData(data2)
val streamQuery2 = stream2
.writeStream
.format("delta")
.option("mergeSchema", "true")
.outputMode("append")
.option("checkpointLocation", "my/checkpointpoint/location1")
.delta("s3://bucket/raw/data/..-")
streamQuery2.processAllAvailable()
第一次,代码在某些输入模式下运行良好(
streamQuery1
)(例如col(A),col(B),,col(C))。如果我尝试通过添加新列来更改架构(例如 col(A)、col(B)、col(C)、col(D)),则相同的代码 ( streamQuery2
)不会使用新列 C 更新增量表,即使我使用增量作为接收器并启用了mergeSchema。我没有收到任何错误,但是 streamQuery2
没有写入任何数据。
根据spark文档通过检查点从故障中恢复,我可以在两次执行之间更改架构,因为我的接收器(增量)允许架构更改
有条件地允许更改具有不同输出模式的投影:仅当输出接收器允许模式从“a”更改为“时,才允许将 sdf.selectExpr("a").writeStream 更改为 sdf.selectExpr("b").writeStream b”。
上述查询第一次执行时生成到 s3 的输出是:
deltaTable/
|
|__checkpoint/
| |__commits/
| | |__0
| |__offsets/
| | |__0
| |__metadata
|
|__delta/
|__....
检查检查点文件夹的内容,我发现没有包含有关我的数据架构的元数据的文件。确实,内容是:
cat deltaTable/checkpoint/metadata
{“id”:“b48487ca-5374-4b93-8e26-503184f2ed57”}
cat deltaTable/checkpoint/commits/0
v1 {“nextBatchWatermarkMs”:0}
cat deltaTable/checkpoint/offsets/0
v1 {“batchWatermarkMs”:0,“batchTimestampMs”:1649859656284,“conf”:{“spark.sql.streaming.stateStore.providerClass”:“org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,“spark .sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark .sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} 0
为了解决这个问题,正如这些link1link2中所报告的那样,删除检查点就足够了,但是这个解决方案的影响很大,因为删除检查点后,我如何从相同的偏移量开始?
任何人都可以向我解释为什么删除之前的查询工作中的检查点,即使检查点文件夹中没有有关模式的元数据,以及如何使用检查点和 mergeSchema 选项来实现模式演化测试?
提前致谢!
表的模式记录在增量日志中,而不是在检查点中。您需要检查表的
_delta_log
主管下的JSON文件(例如/user/hive/warehouse/table_name
)。