将值从Spark Java追加到Cassandra Map列中

问题描述 投票:1回答:1

我有3列的cassandra表。

id text,
value text,
mappings map<text,text>

可以说样本数据如下:

id        | value       | mappings
-----------------------------------------------
1ABC      | xyz         | {"a":"abc","b":"bcd"}

在一个火花作业中,我为id 1ABCb映射计算了一个新值,作为HashMap Ex: "b":"xyz"(可以将映射转换为JavaRDD)

如何使用cassandra java spark连接器将此值附加(覆盖)到表中?我一直在看this如何处理CQL集合追加的示例,但似乎无法弄清楚如何在Java中执行此操作。任何指针将不胜感激。

java apache-spark cassandra spark-java
1个回答
1
投票
解决了以下问题。

通过传递新参数或使用Spark会话中的参数来创建cassandra连接器。

import com.datastax.spark.connector.cql.CassandraConnector; CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf()); // or pass different values for spark.cassandra.connection.host, username and password rdd.foreach(new VoidFunction<TestBean>() { @Override public void call(TestBean t) throws Exception { final String id = t.getId(); final Map<String, String> mappings = t.getMappings(); boolean isUpdated = connector.withSessionDo(new AbstractFunction1<Session, Boolean>() { @Override public Boolean apply(Session v1) { ResultSet updateResultSet = v1.execute(v1.prepare("update test set mappings = mappings + ? where id = ?") .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) .bind(mappings, id)); return updateResultSet.wasApplied(); } }); } });

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.