环境:
Spark version: 2.3.0
Run Mode: Local
Java version: Java 8
Spark应用程序尝试执行以下操作
1)将输入数据转换为数据集[GenericRecord]
2)按GenericRecord的关键属性分组>
3)在组后使用mapGroups迭代值列表并以String格式获得一些结果
4)在文本文件中将结果输出为字符串。
写入文本文件时发生错误。 Spark推断,在步骤3中生成的数据集具有一个二进制列,而不是一个String列。但实际上,它在mapGroups函数中返回一个String。
是否可以进行列数据类型转换,或者让Spark知道它实际上是字符串列而不是二进制列?
val dslSourcePath = args(0) val filePath = args(1) val targetPath = args(2) val df = spark.read.textFile(filePath) implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct) val mapResult = df.flatMap(abc => { JavaConversions.asScalaBuffer(some how return a list of Avro GenericRecord using a java library).seq; }) val groupResult = mapResult.groupByKey(result => String.valueOf(result.get("key"))) .mapGroups((key, valueList) => { val result = StringBuilder.newBuilder.append(key).append(",").append(valueList.count(_=>true)) result.toString() }) groupResult.printSchema() groupResult.write.text(targetPath + "-result-" + System.currentTimeMillis())
输出显示它是一个垃圾箱
root |-- value: binary (nullable = true)
Spark发出了一个错误,它不能将二进制形式写为文本:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a string column, but you have binary.;
at org.apache.spark.sql.execution.datasources.text.TextFileFormat.verifySchema(TextFileFormat.scala:55)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat.prepareWrite(TextFileFormat.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:595)
环境:Spark版本:2.3.0运行模式:本地Java版本:Java 8 spark应用程序尝试执行以下操作1)将输入数据转换为数据集[GenericRecord] 2)通过键进行分组...
正如@ user10938362所说,原因是以下代码会将所有数据编码为字节
implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)