我想处理A中的一些数据,并用处理后的结果替换A。
在 write() 操作完成后,有什么“地方”我可以做一些事情吗?或者有没有办法替换原始目录以确保其中只有最终结果?
顺便说一句,如果我将结果写入我读取的数据源的同一目录中,会有副作用吗?
我当前的程序逻辑(有错误)如下所示:
spark.sql()
处理一些聚合spark.sql()
完全连接结果并选择我需要的列(架构不会改变)我在 AWS Glue 4.0、Spark 3.3、Scala 2 和 Python 3 中运行了代码
我已经知道我所做的只是生成一个 DAG,并且只有当 write() 被调用时,该过程才会开始,如果我使用
overwrite
模式,源数据将在我读取它们之前被删除。
尽管我可以在同一父路径中创建一个新的 s3 对象,但我无法删除旧的对象以保持结果唯一。
我通常会避免直接覆盖,因为如果发生中间故障,您将丢失旧数据和新数据,并且您将拥有一些以您无法真正使用的周期开始的 Spark 中间文件。
此外,较小的数据集可能适用于读取和写入同一位置,因为 Spark 临时中间文件最终会一次性覆盖原始文件。但较大的数据集可能有多个中间写入。这可能会导致循环引用问题,因为 Spark 不会跟踪已处理的数据。
我认为,使用临时目录进行中间处理,从临时目录复制到 s3://bucket-B/1970/01/01/00 并删除临时目录
shutil.rmtree(temp_dir)