在Java Spark中尝试zipWithIndex时出错

问题描述 投票:1回答:1

我尝试在火花中使用zipWithIndex添加具有行号的列,如下所示

val  df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId =  spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

但是我想在JAVA中做如下相同的事情

JavaPairRDD rddzip = df.toJavaRDD().zipWithIndex();
StructType newSchema = dataset.schema().add(new StructField("rowid",
                                        DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()));
dfZippedWithId = sparkSession.createDataFrame(dataset1.rdd(), schema);

我在错误下面得到这个]

Caused by: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.sql.Row
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1819)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more

我尝试使用zipWithIndex添加具有行号的列,如下所示:spark val df = sc.parallelize(Seq((1.0,2.0),(0.0,-1.0),(3.0,4.0),(6.0,-2.3 )))。toDF(“ x”,“ y”)val rddzip = df.rdd ....

java apache-spark rdd apache-spark-dataset
1个回答
0
投票

在scala版本中,您将传递给spark.createDataFrame RDD[Row],而在Java中,您将传递JavaPairRDD,应将其映射到JavaRDD[Row],就像在scala中使用Row.fromSeq

© www.soinside.com 2019 - 2024. All rights reserved.