我想使用spark从弹性搜索数据目录中检索一些数据,然后使用正式文档的方法,然后在这里出错...
这是我的代码(使用Java和JDK 1.8_221):
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.Map;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("Spark ElasticSearch");
conf.set("es.index.auto.create", "true");
conf.set("es.nodes", "10.245.142.213");
conf.set("es.port", "9200");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("ERROR");
JavaPairRDD<String, Map<String, Object>> esRDD =
JavaEsSpark.esRDD(sc, "au_pkt_ams/au_pkt_ams");
for(Tuple2 tuple: esRDD.collect()){
System.out.print(tuple._1()+"-------------");
System.out.println(tuple._2());
}
}
}
这是错误报告(所有日志):
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Partition$class
at org.elasticsearch.spark.rdd.EsPartition.<init>(AbstractEsRDD.scala:84)
at org.elasticsearch.spark.rdd.AbstractEsRDD$$anonfun$getPartitions$1.apply(AbstractEsRDD.scala:49)
at org.elasticsearch.spark.rdd.AbstractEsRDD$$anonfun$getPartitions$1.apply(AbstractEsRDD.scala:48)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.immutable.List.map(List.scala:298)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:48)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:361)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:360)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at Main.main(Main.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Partition$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 22 more
Process finished with exit code 1
日志说esRDD.collect()是错误的,他们无法获取文件'Partition.class',但该文件确实存在。
我有同样的问题。我发现这是Spark库和Elastic库之间的Scala版本不兼容。在我的情况下,Scala 2.12中包含Spark库,但后来我意识到elasticsearch-spark连接器仅包含scala 2.11。
我将Spark库更新为scala 2.11,还将elasticsearch-hadoop更改为elasticsearch-spark-20_2.11。这是新的pom.xml条目,对我来说效果很好。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.4.2</version>
</dependency>