请告诉我,也许有人遇到过这种情况。
我需要在 PyFlink 中配置 CassandraSink 以便将聚合数据写入流上的 Cassandra/Skylla 表。我在文档或源代码中找不到如何在设置时将凭据传递给 CassandraSink
我当前的代码
cassandra_sink = CassandraSink \
.add_sink(aggregated_stream) \
.set_query(insert_query) \
.set_host(CASSANDRA_HOST, int(CASSANDRA_PORT)) \
.enable_ignore_null_fields() \
.build()
cassandra_sink.set_parallelism(GLOBAL_PARALLELISM)
env.execute("Data Ingestion Job to Cassandra")
setClusterBuilder
传递的 c* 接收器的凭据和其他复杂配置。
ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT))
.withQueryOptions(new QueryOptions().setConsistencyLevel(CL))
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(CONNECT_TIMEOUT)
.setReadTimeoutMillis(READ_TIMEOUT))
.withCredentials(username, password)
.build();
}
};
然后将其提供给水槽,如下所示:
cassandra_sink = CassandraSink \
.setClusterBuilder(builder)
// .set your other configurations such as queries, etc.
.build()
.name("Cassandra Sink");