将流数据加入表数据并在流接收时更新表,这可能吗?

问题描述 投票:0回答:1

我正在使用spark-sql 2.4.1,spark-cassandra-connector_2.11-2.4.1.jar和java8。我有一个场景,我需要将流数据与C * / Cassandra表数据结合在一起。

如果找到记录/联接,我需要将现有的C *表记录复制到另一个table_bkp并使用最新数据更新实际的C *表记录。

随着流数据的进入,我需要执行此操作。这可以使用spark-sql steaming完成吗?如果是这样,该怎么办?有什么需要注意的注意事项?

对于每个批次如何新鲜获取C *表数据?

[我在这里做什么错

我有两个表,如下所示:“ master_table”和“ backup_table”

table kspace.master_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );

table kspace.backup_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    backup_timestamp timestamp,
    PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC,   backup_timestamp DESC);


Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
     If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
     Update the record in "master_table" with latest record.

要实现以上目的,我正在像下面这样执行PoC /代码

Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));

String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";


Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);

writeDfToCassandra( baseDs.toDF(), keyspace, master_table);


u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");

Dataset<Row> joinUpdatedRecordsDs =  sparkSession.sql(
            " select p.statement_id, p.statement_flag, p.statement_date,"
            + "p.x_val,p.y_val,p.z_val "
            + " from persisted_records as p "
            + "join u_records as u "
            + "on p.statement_id = u.statement_id  and p.statement_date = u.statement_date");



Dataset<Row> updated_records =   joinUpdatedRecordsDs
                            .withColumn("backup_timestamp",current_timestamp());

updated_records.show(); //Showing correct results 


writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

样本数据

对于带有“ I”标志的第一条记录

master_table

enter image description here

backup_table

enter image description here

对于带有“ U”标志的第二条记录,即与先前相同,但“ y_val”列数据除外

master_table

enter image description here

backup_table

预期

enter image description here

但是实际表数据是

enter image description here

问题:

直到显示显示正确数据的数据框(updated_records)。但是,当我将相同的dataframe(updated_records)插入表中时,C * backup_table数据显示的数据与master_table的最新记录完全相同,但假定它具有master_table的较早记录。

  updated_records.show(); //Showing correct results 


    writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

那么,我在上述程序代码中怎么了?

apache-spark apache-spark-sql spark-streaming datastax cassandra-3.0
1个回答
1
投票

有多种方法可以实现不同级别的性能,具体取决于需要检查的数据量。

例如,如果仅按分区键查找数据,最有效的方法是在Dstream上使用joinWithCassandraTable。对于每一批,它将提取与传入分区键匹配的记录。在结构化流中,这将通过正确编写的SQL连接和DSE自动发生。如果不使用DSE,它将对每个批次的表进行全面扫描。

如果不是每个批次都需要整个表,则将DStream批次与CassandraRDD结合使用将导致在每个批次上完全重新读取RDD。如果不重写整个表,这将更加昂贵。

如果仅更新记录而不检查它们的先前值,只需将传入数据直接写入C *表就足够了。 C *使用upserts和最后一次写入获胜行为,并且将覆盖以前的值(如果存在)。

© www.soinside.com 2019 - 2024. All rights reserved.