当我尝试合并来自数据库和csv文件的两个数据集时出现错误错误消息是这样的:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, targetString), StringType), true, false) AS targetString#205
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, deviceName), StringType), true, false) AS deviceName#206
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, alarmDetectionCode), StringType), true, false) AS alarmDetectionCode#207
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write
当spark应用程序加入两个数据集时,看起来发生了不匹配具有不同的架构,但我不知道它是如何发生的。我的Java代码是这样的:
Dataset result = null;
result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName")));
result.show();
数据集架构:
device
+--------+----------+----------+
|ctgry_cd|deviceInfo|deviceName|
+--------+----------+----------+
searchinfo
+------------+----------+------------------+
|targetString|deviceName|alarmDetectionCode|
+------------+----------+------------------+
这个问题似乎比我想象的要复杂。在我的场合有两个原因。1.我的数据集有一个来自csv的空行。在这种情况下,我可以使用以下代码创建并显示此数据集: