PySpark 随机森林分类器。 Pred.Show() - org.apache.spark.SparkException:无法执行用户定义的函数

问题描述 投票:0回答:2

当我执行 prep.show()/pred.head(2) 时出现错误。我有 df_train 和 df_test 如下。 pyspark随机森林分类器的数据帧格式是否错误?

>>> df_train.show(n=2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[6.0,148.0,72.0,3...|
|  0.0|[1.0,85.0,66.0,29...|
+-----+--------------------+
only showing top 2 rows
>>> df_train.head(2)
[Row(label=1.0, features=DenseVector([6.0, 148.0, 72.0, 35.0, 0.0, 33.6, 0.627, 50.0])),   Row(label=0.0, features=DenseVector([1.0, 85.0, 66.0, 29.0, 0.0, 26.6, 0.351, 31.0]))]
>>> df_test.show(n=2)
+--------------------+
|            features|
+--------------------+
|[1.0,122.0,84.0,4...|
|[2.0,110.0,92.0,1...|
+--------------------+
only showing top 2 rows
>>> df_test.head(2)
[Row(features=DenseVector([1.0, 122.0, 84.0, 47.0, 240.0, 45.8, 0.551])), Row(features=DenseVector([2.0, 110.0, 92.0, 18.0, 10.0, 22.7, 0.235]))]
>>> rf = RandomForestClassifier()
>>> model = rf.fit(df_train)
>>> pred = model.transform(df_test)
>>> pred
DataFrame[features: vector, rawPrediction: vector, probability: vector, prediction: double]
>>> pred.head(2)

21/04/22 12:41:58 错误执行器:阶段 67.0 中的任务 0.0 出现异常(TID 109) org.apache.spark.SparkException:无法执行用户定义的函数(

完全错误:

>>> pred.head(2)

21/04/22 12:41:58 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID 109)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
    at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)
    at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
    at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:170)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:224)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:223)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.IndexedSeqLike$$anon$1.foreach(IndexedSeqLike.scala:52)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:223)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:179)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
    ... 22 more
21/04/22 12:41:58 WARN TaskSetManager: Lost task 0.0 in stage 67.0 (TID 109, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
    at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)
    at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
    at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:170)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:224)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:223)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.IndexedSeqLike$$anon$1.foreach(IndexedSeqLike.scala:52)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:223)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:179)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
    ... 22 more

21/04/22 12:41:58 ERROR TaskSetManager: Task 0 in stage 67.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1252, in head
    return self.take(n)
  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 571, in take
    return self.limit(num).collect()
  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 533, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o719.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 67.0 failed 1 times, most recent failure: Lost task 0.0 in stage 67.0 (TID 109, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
    at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)
    at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
    at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:170)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:224)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:223)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.IndexedSeqLike$$anon$1.foreach(IndexedSeqLike.scala:52)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:223)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:179)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
    ... 22 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
    at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)
    at org.apache.spark.ml.tree.ContinuousSplit.shouldGoLeft(Split.scala:161)
    at org.apache.spark.ml.tree.InternalNode.predictImpl(Node.scala:170)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:224)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$$anonfun$predictRaw$1.apply(RandomForestClassifier.scala:223)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.IndexedSeqLike$$anon$1.foreach(IndexedSeqLike.scala:52)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:223)
    at org.apache.spark.ml.classification.RandomForestClassificationModel.predictRaw(RandomForestClassifier.scala:179)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
    at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
    ... 22 more
python pyspark
2个回答
0
投票

df_test 缺少一项功能。 (由以下原因引起:java.lang.ArrayIndexOutOfBoundsException:7 ).


0
投票

我在使用 df.show() 时遇到了类似的问题。具有讽刺意味的是,它对于我的数据集中的前 16 行可以完美地工作,但不会显示超过 16 行。在检查我的数据后,我发现某些特征中的某些值是空值,瞧,它们从第 17 行开始。

为了解决这个问题,我在使用 VectorAssembler 之前使用输入器填充了缺失值,一切都运行良好。

© www.soinside.com 2019 - 2024. All rights reserved.