pyspark 检查点在本地计算机上失败

问题描述 投票:0回答:2

我刚刚开始在本地计算机上使用独立版本学习 pyspark。我无法让检查站工作。我把剧本归结为这个......

spark = SparkSession.builder.appName("PyTest").master("local[*]").getOrCreate()

spark.sparkContext.setCheckpointDir("/RddCheckPoint")
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
df.checkpoint()

我得到这个输出...

>>> spark = SparkSession.builder.appName("PyTest").master("local[*]").getOrCreate()
>>>
>>> spark.sparkContext.setCheckpointDir("/RddCheckPoint")
>>> df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
>>> df.checkpoint()
20/01/24 15:26:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "N:\spark\python\pyspark\sql\dataframe.py", line 463, in checkpoint
    jdf = self._jdf.checkpoint(eager)
  File "N:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1286, in __call__
  File "N:\spark\python\pyspark\sql\utils.py", line 98, in deco
    return f(*a, **kw)
  File "N:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o71.checkpoint.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
        at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
        at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
        at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
        at org.apache.spark.rdd.ReliableCheckpointRDD.getPartitions(ReliableCheckpointRDD.scala:74)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:179)
        at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:59)
        at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
        at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1801)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1791)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2118)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
        at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:689)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
        at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:680)
        at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:643)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)

该错误没有给出任何有关失败原因的具体信息。我怀疑我错过了一些 Spark 配置,但不确定是什么......

pyspark spark-checkpoint
2个回答
0
投票

你出现这个错误是因为要么没有创建checkpoint目录,要么你没有在这个目录中写入的权限(因为checkpoint目录在根目录“/”下)。

import os

os.mkdir("RddCheckPoint")
spark = SparkSession.builder.appName("PyTest").master("local[*]").getOrCreate()

spark.sparkContext.setCheckpointDir("RddCheckPoint")
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
df.checkpoint()

0
投票

我曾经一度在同样的问题上苦苦挣扎。原来是我winutils的配置有问题。

最终我使用 github 中的 winutils 项目找到了解决方案。

https://github.com/cdarlint/winutils

(程度较小https://github.com/steveloughran/winutils)。

  1. 安装winutils
  2. 设置环境变量HADOOP_HOME指向winutils中您的Hadoop版本的路径(例如hadoop-3.2.0)。
  3. 确保也更新 PATH,
    PATH=%PATH%;%HADOOP_HOME%\bin

...这应该可以帮助您克服“无本机库”和“access0”错误。 (谢谢 cdarlint!)

© www.soinside.com 2019 - 2024. All rights reserved.