如何在 PyFlink 中为 CassandraSink 提供凭证

问题描述 投票:0回答:1

请告诉我,也许有人遇到过这种情况。

我需要在 PyFlink 中配置 CassandraSink 以便将聚合数据写入流上的 Cassandra/Skylla 表。我在文档或源代码中找不到如何在设置时将凭据传递给 CassandraSink

我当前的代码


cassandra_sink = CassandraSink \
        .add_sink(aggregated_stream) \
        .set_query(insert_query) \
        .set_host(CASSANDRA_HOST, int(CASSANDRA_PORT)) \
        .enable_ignore_null_fields() \
        .build()

cassandra_sink.set_parallelism(GLOBAL_PARALLELISM)
env.execute("Data Ingestion Job to Cassandra")
python cassandra apache-flink scylla pyflink
1个回答
0
投票

您需要一个集群构建器对象来传递通过

setClusterBuilder
传递的 c* 接收器的凭据和其他复杂配置。

ClusterBuilder clusterBuilder = new ClusterBuilder() {    
    @Override    
    protected Cluster buildCluster(Cluster.Builder builder) {      
        return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT))
                      .withQueryOptions(new QueryOptions().setConsistencyLevel(CL))                    
                      .withSocketOptions(new SocketOptions()                   
                      .setConnectTimeoutMillis(CONNECT_TIMEOUT)                    
                      .setReadTimeoutMillis(READ_TIMEOUT)) 
                      .withCredentials(username, password)                   
                      .build();    
    }
};

然后将其提供给水槽,如下所示:

cassandra_sink = CassandraSink \
    .setClusterBuilder(builder)
    // .set your other configurations such as queries, etc.
    .build()
    .name("Cassandra Sink");

© www.soinside.com 2019 - 2024. All rights reserved.