如何在Cassandra中将列从一个键空间移动到另一个键空间

问题描述 投票:2回答:2

我可以将下面提到的一些列(cc_payment,keyid)从Cassandra键空间billing移动到其他Cassandra payments键空间吗? payment_info将成为一张新桌子。

我有什么方法可以搬家吗?或者我是否需要复制到csv文件并使用COPY FROM选项导入?由于数据量巨大,我正在寻找直接从一个键空间移动到另一个键空间的选项。我们正在使用datastax cassandra。

感谢您的帮助。

    FROM
========

keyspace:  billing
create table if not exists billing_info (
      user_id text,
      billing_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), billing_id)
) WITH CLUSTERING ORDER BY (billing_id DESC);

    TO
======
keyspace:  payments
create table if not exists payment_info (
      user_id text,
      payment_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), payment_id)
) WITH CLUSTERING ORDER BY (payment_id DESC);
cassandra datastax-enterprise cassandra-3.0
2个回答
2
投票

有多种方法可以做到这一点:

直接复制文件,然后更改表结构

因为表只有一个列名,所以直接复制文件要快得多,如下所示:

  • 创建一个表payments.payment_info,其结构与billing.billing_info完全相同
  • 停止写信给billing.billing_info

然后在群集的每个节点上执行以下操作:

  • 为它冲洗:nodetool flush billing billing_info
  • 切换到Cassandra的数据目录
  • 在运行Cassandra的同一用户下将文件billing/billing_info-<ID_of_the_table>/*复制到payments/payment_info-<ID_of_the_table>/
  • 执行nodetool refreshpayments.payment_info`
  • 在cqlsh中检查数据是否可用
  • 使用以下命令执行列的重命名:ALTER TABLE payments.payment_info RENAME billing_id TO payment_id;

通过复制,例如使用DSBulk或Spark来迁移数据。

如果您正在使用DSE,那么您可以使用DSBulk(最好采用最新版本)从一个表中卸载数据并加载到另一个表中。通过将数据写入标准输出并通过Unix管道从标准输入读取它,此命令可以在不创建中间副本的情况下工作,尽管在这种情况下它会更慢,因为它无法实现必要的并行性。

在最简单的情况下,它将被调用如下,提供更改的字段名称之间的映射(有关详细信息,请参阅文档0:

dsbulk unload -k ks1 -t table1 -c json | dsbulk load -k ks2 -t table2 -c json -m "mapping_to_accomodate_changes_in_field_names"

但是,如果您不仅需要复制数据,还需要复制其他东西,例如TTL和WriteTime,那么任务将会更加复杂 - 在这种情况下,您需要显式导出它,然后分批加载数据,分别为每列。


1
投票

Spark可以使用这个小片段。您可以在updateColumns中执行所需操作

val myKeyspace = "oldkeyspace" 
val myTable = "oldtable"
val newKeyspace = "newkeyspace" 
val newTable = "newtabl"

def updateColumns(row: CassandraRow): CassandraRow = { 
     val inputMap = row.toMap val newData = Map( "newColumn" -> "somevalue" ) 
     var outputMap = inputMap ++ newData CassandraRow.fromMap(outputMap) 
}

val result = sc.cassandraTable(myKeyspace, myTable) .map(updateColumns(_)) 
  .saveToCassandra(newKeyspace, newTable)
© www.soinside.com 2019 - 2024. All rights reserved.