我正在尝试使用Apache Spark来处理我的大型(~230k条目)cassandra数据集,但我经常遇到不同类型的错误。但是,当在数据集~200个条目上运行时,我可以成功运行应用程序。我有3个节点的火花设置,1个主节点和2个工作人员,2个工作人员也安装了一个cassandra集群,其数据索引复制因子为2.我的2个火花工人在Web界面上显示2.4和2.8 GB内存我在运行应用程序时将spark.executor.memory
设置为2409,以获得4.7 GB的组合内存。这是我的WebUI主页
其中一个任务的环境页面
在这个阶段,我只是尝试使用spark来处理存储在cassandra中的数据。这是我用来在Java中执行此操作的基本代码
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", CASSANDRA_HOST)
.setJars(jars);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);
CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
System.out.println(rdd.count());
为了成功运行,在一个小数据集(200个条目)上,事件界面看起来像这样
但是当我在一个大型数据集上运行相同的东西(即我只改变CASSANDRA_COLUMN_FAMILY
)时,作业永远不会终止在终端内,日志看起来像这样
大约2分钟后,执行者的stderr看起来像这样
大约7分钟后,我明白了
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
在我的终端,我必须手动杀死SparkSubmit
进程。但是,大型数据集是从仅占用22 MB的二进制文件编制索引的,并且在执行nodetool status
时,我可以看到在我的两个cassandra节点中只存储了~115 MB的数据。我也尝试在我的数据集上使用Spark SQL,但也有类似的结果。对于Transformation-Action程序和使用Spark SQL的程序,我的设置在哪里出错,我应该怎样做才能成功处理我的数据集。
我已经尝试过以下方法
-Xms1G -Xmx1G
增加内存,但程序失败,但有一个例外,我应该设置spark.executor.memory
,我有。spark.cassandra.input.split.size
,它没有说它不是一个有效的选项,类似的选项是spark.cassandra.input.split.size_in_mb
,我设置为1,没有任何效果。编辑
基于this答案,我也尝试了以下方法:
spark.storage.memoryFraction
设为0spark.storage.memoryFraction
设置为零并使用persist
与MEMORY_ONLY
,MEMORY_ONLY_SER
,MEMORY_AND_DISK
和MEMORY_AND_DISK_SER
。版本:
我认为最新的spark-cassandra-connector存在问题。参数spark.cassandra.input.split.size_in_mb
应该具有64 MB的默认值,在代码中被解释为64字节。这会导致创建太多分区,无法通过spark计划。尝试将conf值设置为
spark.cassandra.input.split.size_in_mb=67108864