我有一个简单的 Cassandra 表,例如:
CREATE TABLE my_keyspace.my_table (
my_composite_pk_a bigint,
my_composite_pk_b ascii,
value blob,
PRIMARY KEY ((my_composite_pk_a, my_composite_pk_b))
) WITH bloom_filter_fp_chance = 0.1
AND gc_grace_seconds = 86400
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'enabled': 'true'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};
注意,
value
指的是BLOB,每个长度为1MB
。
我有一个 Spark 应用程序,就像这样简单:
spark
// read data from parquet
.read.parquet("...")
// skip large partitions, to avoid overwhelming Cassandra
.withColumn("bytes_count",length(col("value")))
.filter("bytes_count < 1000000") // < 1MB
// project
.select("my_composite_pk_a", "my_composite_pk_b", "value")
// commit to Cassandra
.writeTo("cassandra.my_keyspace.my_table")
.append()
以下属性用于配置 Spark Cassandra 连接器:
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes=6
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key=none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec=6
spark.sql.catalog.cassandra.spark.cassandra.connection.host=node1,node2
spark.sql.catalog.cassandra.spark.cassandra.connection.port=9042
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level=LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics=false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS=90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count=100
spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
spark.sql.catalog.cassandra.spark.cassandra.auth.username=USERNAME
spark.sql.catalog.cassandra.spark.cassandra.auth.password=PASSWORD
相关的 Spark 属性是:
--total-executor-cores 6
--executor-cores 6
--executor-memory 15G
--driver-memory 6G
--driver-cores 4
因此我们有一个简单的执行器,有 6 个核心。
任务运行时出现以下错误,导致整个 Spark 应用程序崩溃:
...
24/03/06 10:07:24 WARN TaskSetManager: Lost task 886.0 in stage 3.0 (TID 2959, node1, executor 0): java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
...
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from node4:40201 in 10 seconds
...
24/03/06 10:07:24 ERROR AppendDataExec: Data source write support CassandraBulkWrite(
org.apache.spark.sql.SparkSession@2e8eafb2,
com.datastax.spark.connector.cql.CassandraConnector@1a865ebf,
TableDef(
my_keyspace,
my_table,
ArrayBuffer(
ColumnDef(
my_composite_pk_a,
PartitionKeyColumn,
BigIntType
),
ColumnDef(
my_composite_pk_b,
PartitionKeyColumn,
AsciiType
)
),
ArrayBuffer(),
Stream(
ColumnDef(
my_value,
RegularColumn,
BlobType
)
),
Stream(),
false,
false,
Map()
),
WriteConf(
RowsInBatch(1),
1000,
None,
LOCAL_QUORUM,
false,
false,
6,
Some(6.0),
TTLOption(DefaultValue),
TimestampOption(DefaultValue),false,None),
StructType(StructField(snapshot,LongType,true),
StructField(data_key,StringType,true),
StructField(value,BinaryType,true)
),
org.apache.spark.SparkConf@3903cfc9
)
他们似乎是 Cassandra 连接器的写入错误..
你知道为什么吗?
批量大小仅为 1 的 1MB BLOB 会给 Cassandra 带来压力。考虑增加您的
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1
以获得更高效的写入。
请注意,极大的批量大小可能会导致内存问题,因此请从适度增加和实验开始。