我正在使用spark-sql.2.4.1v,datastax-java-cassandra-connector_2.11-2.4.1.jar和java8。
我这样创建cassandra表:
create company(company_id int PRIMARY_KEY, company_name text);
JavaBean如下:
class CompanyRecord(
Integer company_id;
String company_name;
//getter and setters
//default & parametarized constructors
)
下面的火花代码将数据保存到cassandra表中:
Dataset<Row> latestUpdatedDs = joinUpdatedRecordsDs.select("company_id", "company_name"); /// select from other source like xls sheet
Encoder<CompanyRecord> comanyEncoder = Encoders.bean(CompanyRecord.class);
Dataset<CompanyRecord> inputDs = latestUpdatedDs.as(comanyEncoder );
inputDs
.write()
.format("org.apache.spark.sql.cassandra")
.option("table","company")
.option("keyspace", "ks_one")
.mode(SaveMode.Append)
.save();
如下所示提供错误:
[原因:org.codehaus.commons.compiler.CompileException:文件'generated.java',第562行,第35栏:编译失败:org.codehaus.commons.compiler.CompileException:文件'generated.java',行562,列35:在任何封闭类,任何超类型中,也未通过静态导入,未声明名为“ toString”的方法在org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator $ .org $ apache $ spark $ sql $ catalyst $ expressions $ codegen $ CodeGenerator $$ doCompile(CodeGenerator.scala:1304)在org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator $$ anon $ 1.load(CodeGenerator.scala:1376)在org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator $$ anon $ 1.load(CodeGenerator.scala:1373)在org.spark_project.guava.cache.LocalCache $ LoadingValueReference.loadFuture(LocalCache.java:3599)在org.spark_project.guava.cache.LocalCache $ Segment.loadSync(LocalCache.java:2379)
问题:
如何找出问题所在?以及如何解决这个问题?
据我所知:
第562行,第35列:在任何封闭类或任何超类型中都没有声明名为“ toString”的方法。
这可能是个问题,您可能需要重写CompanyRecord类的toString,Spark还可以在实现了https://spark.apache.org/docs/latest/tuning.html中提到的实现Serializable接口的自定义对象上运行。
这两件事应该可以解决您的问题。
此问题出现在数据类型不匹配的地方,即您在表中定义的内容以及您的bean /数据框试图插入的内容。
一旦我正确纠正了数据类型,问题就解决了。