我尝试在火花中使用zipWithIndex
添加具有行号的列,如下所示
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId = spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
但是我想在JAVA中做如下相同的事情
JavaPairRDD rddzip = df.toJavaRDD().zipWithIndex();
StructType newSchema = dataset.schema().add(new StructField("rowid",
DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()));
dfZippedWithId = sparkSession.createDataFrame(dataset1.rdd(), schema);
我在错误下面得到这个]
Caused by: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.sql.Row
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1819)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
我尝试使用zipWithIndex添加具有行号的列,如下所示:spark val df = sc.parallelize(Seq((1.0,2.0),(0.0,-1.0),(3.0,4.0),(6.0,-2.3 )))。toDF(“ x”,“ y”)val rddzip = df.rdd ....
在scala版本中,您将传递给spark.createDataFrame
RDD[Row]
,而在Java中,您将传递JavaPairRDD
,应将其映射到JavaRDD[Row]
,就像在scala中使用Row.fromSeq