我目前正在使用 pyspark 为机器学习应用程序执行一些数据清理。 最后一个会话崩溃了,但我设置了一个 checkpointdir 并检查了我的 DataFrame。
现在我有以下形式的检查点数据目录:
id-of-checkpoint-dir/
\\- rdd-123/
\\- rdd-456/
rdd-子文件夹中的文件似乎是十六进制文件。
我如何读取这个检查点,以便我可以继续我的数据准备而不是再次运行整个过程?
你可以试试这个
path = 'hdfs:///xxx/id-of-checkpoint-dir/rdd-123'
rdd = spark.sparkContext._jsc.checkpointFile(path)
我从官方测试用例中找到了这个方法
@Test
public void checkpointAndRestore() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath());
assertFalse(rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
assertTrue(rdd.isCheckpointed());
assertTrue(rdd.getCheckpointFile().isPresent());
JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}