我有两个集群-1. Cloudera Hadoop- Spark作业在此处运行2.云-Cassandra群集,多个DC
在将数据帧从我的spark作业写入cassandra集群时,我在写入之前在spark中进行了重新分区(repartionCount = 10)。见下文:
import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
.mode(SaveMode.Append)
.options(options)
.option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
.option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
.save()
在我的多租户spark集群中,对于具有20M条记录且在配置下的spark批处理负载,我看到很多任务失败,资源抢占和运行中失败。
spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20
spark.cassandra.connection.compression=LZ4
我该如何调整?归咎于责任吗?
PS:我一开始的理解是:对于具有20M行的负载,“分区”应在执行程序(每个具有2M行的分区)上平均分配负载,并且将在这些分区级别(在2M行上)进行批处理。但是现在,我怀疑如果spark-cassandra-connector在整个数据帧级别(整个20M行)进行批处理,是否会造成不必要的改组。
您无需在Spark中进行分区-只需将数据从Spark写入Cassandra,请勿尝试更改Spark Cassandra Connector的默认设置-在大多数情况下它们都可以正常工作。您需要查看发生什么类型的阶段故障-最有可能是因为spark.cassandra.output.concurrent.writes=20
而使Cassandra超载(使用默认值(5
))-有时更少的编写器可以帮助您更快地写入数据,因为您不会超载Cassandra,并且作业没有重新启动。