Spark Cassandra 连接器:错误 AppendDataExec:数据源写入支持 CassandraBulkWrite

问题描述 投票:0回答:1

我有一个简单的 Cassandra 表,例如:

CREATE TABLE my_keyspace.my_table (
    my_composite_pk_a bigint,
    my_composite_pk_b ascii,
    value blob,
    PRIMARY KEY ((my_composite_pk_a, my_composite_pk_b))
) WITH bloom_filter_fp_chance = 0.1
   AND gc_grace_seconds = 86400
   AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
   AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'enabled': 'true'}
   AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};

注意,

value
指的是BLOB,每个长度为
1MB

我有一个 Spark 应用程序,就像这样简单:

spark
  // read data from parquet
  .read.parquet("...")
  // skip large partitions, to avoid overwhelming Cassandra
  .withColumn("bytes_count",length(col("value")))
  .filter("bytes_count < 1000000") // < 1MB
  // project
  .select("my_composite_pk_a", "my_composite_pk_b", "value")
  // commit to Cassandra
  .writeTo("cassandra.my_keyspace.my_table")
  .append()

以下属性用于配置 Spark Cassandra 连接器:

spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes=6 
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1 
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key=none 
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec=6 
spark.sql.catalog.cassandra.spark.cassandra.connection.host=node1,node2 
spark.sql.catalog.cassandra.spark.cassandra.connection.port=9042 
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level=LOCAL_QUORUM 
spark.sql.catalog.cassandra.spark.cassandra.output.metrics=false 
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS=90000 
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count=100 
spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalog 
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions 
spark.sql.catalog.cassandra.spark.cassandra.auth.username=USERNAME 
spark.sql.catalog.cassandra.spark.cassandra.auth.password=PASSWORD

相关的 Spark 属性是:

--total-executor-cores 6 
--executor-cores 6 
--executor-memory 15G 
--driver-memory 6G 
--driver-cores 4 

因此我们有一个简单的执行器,有 6 个核心。

任务运行时出现以下错误,导致整个 Spark 应用程序崩溃:

...
24/03/06 10:07:24 WARN TaskSetManager: Lost task 886.0 in stage 3.0 (TID 2959, node1, executor 0): java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
...
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply from node4:40201 in 10 seconds
...
24/03/06 10:07:24 ERROR AppendDataExec: Data source write support CassandraBulkWrite(
    org.apache.spark.sql.SparkSession@2e8eafb2,
    com.datastax.spark.connector.cql.CassandraConnector@1a865ebf,
    TableDef(
        my_keyspace,
        my_table,
        ArrayBuffer(
            ColumnDef(
                my_composite_pk_a,
                PartitionKeyColumn,
                BigIntType
            ), 
            ColumnDef(
                my_composite_pk_b,
                PartitionKeyColumn,
                AsciiType
            )
        ),
        ArrayBuffer(),
        Stream(
            ColumnDef(
                my_value,
                RegularColumn,
                BlobType
            )
        ),
        Stream(),
        false,
        false,
        Map()
    ),
    WriteConf(
        RowsInBatch(1),
        1000,
        None,
        LOCAL_QUORUM,
        false,
        false,
        6,
        Some(6.0),
        TTLOption(DefaultValue),
        TimestampOption(DefaultValue),false,None),
        StructType(StructField(snapshot,LongType,true),
        StructField(data_key,StringType,true),
        StructField(value,BinaryType,true)
    ),
    org.apache.spark.SparkConf@3903cfc9
)

他们似乎是 Cassandra 连接器的写入错误..

你知道为什么吗?

apache-spark cassandra datastax datastax-enterprise
1个回答
0
投票

批量大小仅为 1 的 1MB BLOB 会给 Cassandra 带来压力。考虑增加您的

spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows=1
以获得更高效的写入。

请注意,极大的批量大小可能会导致内存问题,因此请从适度增加和实验开始。

© www.soinside.com 2019 - 2024. All rights reserved.