我想通过execute.Async调用对cassandra db进行异步调用在manuel中,我找到了这段代码,但是我不明白如何将所有行收集到任何列表中。真正的基本调用,例如从表中选择*,我想存储所有结果。
https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/
CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();
// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
sessionStage.thenCompose(
session -> session.executeAsync("SELECT release_version FROM system.local"));
// Apply a synchronous computation:
CompletionStage<String> resultStage =
responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));
// Perform an action once a stage is complete:
resultStage.whenComplete(
(version, error) -> {
if (error != null) {
System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
} else {
System.out.printf("Server version: %s%n", version);
}
sessionStage.thenAccept(CqlSession::closeAsync);
});
您需要参考section about asynchronous paging-您需要提供一个回调,该回调会将数据收集到作为外部对象提供的列表中。文档包含以下示例:
CompletionStage<AsyncResultSet> futureRs =
session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);
void processRows(AsyncResultSet rs, Throwable error) {
if (error != null) {
// The query failed, process the error
} else {
for (Row row : rs.currentPage()) {
// Process the row...
}
if (rs.hasMorePages()) {
rs.fetchNextPage().whenComplete(this::processRows);
}
}
}
在这种情况下,processRows
可以将数据存储在作为当前对象一部分的列表中,如下所示:
class Abc {
List<Row> rows = new ArrayList<>();
// call to executeAsync
void processRows(AsyncResultSet rs, Throwable error) {
....
for (Row row : rs.currentPage()) {
rows.add(row);
}
....
}
}
但是您在使用select * from table
时需要非常小心,因为它可能返回很多结果,而且如果您有太多的数据,它可能会超时-在这种情况下,最好执行令牌范围扫描(我有[C0 ],但4.x尚无)。