将数据帧从Spark群集写入cassandra群集:分区和性能调整

问题描述 投票:0回答:1

我有两个集群-1. Cloudera Hadoop- Spark作业在此处运行2.云-Cassandra群集,多个DC

在将数据帧从我的spark作业写入cassandra集群时,我在写入之前在spark中进行了重新分区(repartionCount = 10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户spark集群中,对于具有20M条记录且在配置下的spark批处理负载,我看到很多任务失败,资源抢占和运行中失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我该如何调整?归咎于责任吗?

PS:我一开始的理解是:对于具有20M行的负载,“分区”应在执行程序(每个具有2M行的分区)上平均分配负载,并且将在这些分区级别(在2M行上)进行批处理。但是现在,我怀疑如果spark-cassandra-connector在整个数据帧级别(整个20M行)进行批处理,是否会造成不必要的改组。

scala apache-spark cassandra datastax-java-driver spark-cassandra-connector
1个回答
1
投票

您无需在Spark中进行分区-只需将数据从Spark写入Cassandra,请勿尝试更改Spark Cassandra Connector的默认设置-在大多数情况下它们都可以正常工作。您需要查看发生什么类型的阶段故障-最有可能是因为spark.cassandra.output.concurrent.writes=20而使Cassandra超载(使用默认值(5))-有时更少的编写器可以帮助您更快地写入数据,因为您不会超载Cassandra,并且作业没有重新启动。

© www.soinside.com 2019 - 2024. All rights reserved.