我是 AWS 的初学者。
我的客户允许我访问 EC2 实例,并允许我使用 Athena 访问一些表。 这些表位于 AWSDataCatalog 中。我对表格来源没有太多信息。
我只想从我的 EC2 实例中的这些查询中获取一些数据。
我浏览了 Athena 及其 API 的文档。 但我找不到。
我会连接到 Athena 服务来获取数据还是 我是否会连接到原始数据源,而 Athena 只是 AWS 提供的编辑器?
请帮忙。
Amazon Athena 是另一种服务,例如 S3 DynamoDB 等。您可以使用 Amazon Athena API 从中检索数据。例如,假设您想要使用 AWS SDK for Java v2 编写代码。您可以使用此代码检索数据。
package aws.example.athena;
//snippet-start:[athena.java2.StartQueryExample.import]
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.QueryExecutionContext;
import software.amazon.awssdk.services.athena.model.ResultConfiguration;
import software.amazon.awssdk.services.athena.model.StartQueryExecutionRequest;
import software.amazon.awssdk.services.athena.model.StartQueryExecutionResponse;
import software.amazon.awssdk.services.athena.model.AthenaException;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionRequest;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionResponse;
import software.amazon.awssdk.services.athena.model.QueryExecutionState;
import software.amazon.awssdk.services.athena.model.GetQueryResultsRequest;
import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse;
import software.amazon.awssdk.services.athena.model.ColumnInfo;
import software.amazon.awssdk.services.athena.model.Row;
import software.amazon.awssdk.services.athena.model.Datum;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.util.List;
//snippet-end:[athena.java2.StartQueryExample.import]
/**
* Before running this Java V2 code example, set up your development environment, including your credentials.
*
* For more information, see the following documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
//snippet-start:[athena.java2.StartQueryExample.main]
public class StartQueryExample {
public static void main(String[] args) throws InterruptedException {
AthenaClient athenaClient = AthenaClient.builder()
.region(Region.US_WEST_2)
.credentialsProvider(ProfileCredentialsProvider.create())
.build();
String queryExecutionId = submitAthenaQuery(athenaClient);
waitForQueryToComplete(athenaClient, queryExecutionId);
processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
}
// Submits a sample query to Amazon Athena and returns the execution ID of the query.
public static String submitAthenaQuery(AthenaClient athenaClient) {
try {
// The QueryExecutionContext allows us to set the database.
QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
.database(ExampleConstants.ATHENA_DEFAULT_DATABASE)
.build();
// The result configuration specifies where the results of the query should go.
ResultConfiguration resultConfiguration = ResultConfiguration.builder()
.outputLocation(ExampleConstants.ATHENA_OUTPUT_BUCKET)
.build();
StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
.queryString(ExampleConstants.ATHENA_SAMPLE_QUERY)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
.build();
StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
return startQueryExecutionResponse.queryExecutionId();
} catch (AthenaException e) {
e.printStackTrace();
System.exit(1);
}
return "";
}
// Wait for an Amazon Athena query to complete, fail or to be cancelled.
public static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {
GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
.queryExecutionId(queryExecutionId)
.build();
GetQueryExecutionResponse getQueryExecutionResponse;
boolean isQueryStillRunning = true;
while (isQueryStillRunning) {
getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
if (queryState.equals(QueryExecutionState.FAILED.toString())) {
throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
.queryExecution().status().stateChangeReason());
} else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
throw new RuntimeException("The Amazon Athena query was cancelled.");
} else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
isQueryStillRunning = false;
} else {
// Sleep an amount of time before retrying again.
Thread.sleep(ExampleConstants.SLEEP_AMOUNT_IN_MS);
}
System.out.println("The current status is: " + queryState);
}
}
// This code retrieves the results of a query
public static void processResultRows(AthenaClient athenaClient, String queryExecutionId) {
try {
// Max Results can be set but if its not set,
// it will choose the maximum page size.
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId)
.build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
for (GetQueryResultsResponse result : getQueryResultsResults) {
List<ColumnInfo> columnInfoList = result.resultSet().resultSetMetadata().columnInfo();
List<Row> results = result.resultSet().rows();
processRow(results, columnInfoList);
}
} catch (AthenaException e) {
e.printStackTrace();
System.exit(1);
}
}
private static void processRow(List<Row> row, List<ColumnInfo> columnInfoList) {
for (Row myRow : row) {
List<Datum> allData = myRow.data();
for (Datum data : allData) {
System.out.println("The value of the column is "+data.varCharValue());
}
}
}
//snippet-end:[athena.java2.StartQueryExample.main]
}
您可以在 AWS Github 中找到此示例和其他示例:
https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/athena