使用 aws java sdk v2 获取 s3select 的部分 json 响应

问题描述 投票:0回答:2

我正在尝试在 spring boot 应用程序中实现 s3select 以查询 s3 存储桶中的镶木地板文件,我只从 s3select 输出中获得部分结果,请帮助确定问题,我使用了 aws java sdk v2.

检查 json 输出(打印在控制台中),输出中的总字符数为 65k。

我正在使用 eclipse 并尝试取消选中控制台首选项中的“限制控制台输出”,但没有帮助。

代码在这里:-

import java.util.List;
import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompressionType;
import software.amazon.awssdk.services.s3.model.EndEvent;
import software.amazon.awssdk.services.s3.model.ExpressionType;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.JSONOutput;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.ParquetInput;
import software.amazon.awssdk.services.s3.model.RecordsEvent;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream.EventType;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;

public class ParquetSelect {
    
    private static final String BUCKET_NAME = "<bucket-name>";
    private static final String KEY = "<object-key>";
    private static final String QUERY = "select * from S3Object s";
    public static S3AsyncClient s3;
    

    public static void selectObjectContent() {
        Handler handler = new Handler();
        
        SelectQueryWithHandler(handler).join();

        RecordsEvent recordsEvent = (RecordsEvent) handler.receivedEvents.stream()
                                                                         .filter(e -> e.sdkEventType() == EventType.RECORDS)
                                                                         .findFirst()
                                                                         .orElse(null);

        System.out.println(recordsEvent.payload().asUtf8String());

    }

    private static CompletableFuture<Void> SelectQueryWithHandler(SelectObjectContentResponseHandler handler) {
        InputSerialization inputSerialization = InputSerialization.builder()
                                                                  .parquet(ParquetInput.builder().build())
                                                                  .compressionType(CompressionType.NONE)
                                                                  .build();


        OutputSerialization outputSerialization = OutputSerialization.builder()
                                                                     .json(JSONOutput.builder().build())
                                                                     .build();


        SelectObjectContentRequest select = SelectObjectContentRequest.builder()
                                                                      .bucket(BUCKET_NAME)
                                                                      .key(KEY)
                                                                      .expression(QUERY)
                                                                      .expressionType(ExpressionType.SQL)
                                                                      .inputSerialization(inputSerialization)
                                                                      .outputSerialization(outputSerialization)
                                                                      .build();

        return s3.selectObjectContent(select, handler);
    }

    private static class Handler implements SelectObjectContentResponseHandler {
        private SelectObjectContentResponse response;
        private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
        private Throwable exception;

        @Override
        public void responseReceived(SelectObjectContentResponse response) {
            this.response = response;
        }

        @Override
        public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
            publisher.subscribe(receivedEvents::add);
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            exception = throwable;
        }

        @Override
        public void complete() {
        }
    }
    
}
java amazon-web-services amazon-s3 aws-java-sdk amazon-s3-select
2个回答
0
投票

我看到你正在使用 selectObjectContent()。您是否尝试过调用 s3AsyncClient.getObject() 方法。这对你有用吗?

例如,这里有一个代码示例,它从 Amazon S3 存储桶中获取一个 PDF 文件,并将该 PDF 文件写入本地文件。

package com.example.s3.async;
// snippet-start:[s3.java2.async_stream_ops.complete]

// snippet-start:[s3.java2.async_stream_ops.import]
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
// snippet-end:[s3.java2.async_stream_ops.import]

// snippet-start:[s3.java2.async_stream_ops.main]

/**
 * Before running this Java V2 code example, set up your development environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */

public class S3AsyncStreamOps {

    public static void main(String[] args) {

        final String usage = "\n" +
                "Usage:\n" +
                "    <bucketName> <objectKey> <path>\n\n" +
                "Where:\n" +
                "    bucketName - The name of the Amazon S3 bucket (for example, bucket1). \n\n" +
                "    objectKey - The name of the object (for example, book.pdf). \n" +
                "    path - The local path to the file (for example, C:/AWS/book.pdf). \n" ;

        if (args.length != 3) {
            System.out.println(usage);
            System.exit(1);
         }

        String bucketName = args[0];
        String objectKey = args[1];
        String path = args[2];
        ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create();
        Region region = Region.US_EAST_1;
        S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
                .region(region)
                .credentialsProvider(credentialsProvider)
                .build();

        GetObjectRequest objectRequest = GetObjectRequest.builder()
                .bucket(bucketName)
                .key(objectKey)
                .build();

        CompletableFuture<GetObjectResponse> futureGet = s3AsyncClient.getObject(objectRequest,
                AsyncResponseTransformer.toFile(Paths.get(path)));

        futureGet.whenComplete((resp, err) -> {
            try {
                if (resp != null) {
                    System.out.println("Object downloaded. Details: "+resp);
                } else {
                    err.printStackTrace();
                }
            } finally {
               // Only close the client when you are completely done with it.
                s3AsyncClient.close();
            }
        });
        futureGet.join();
    }
}

0
投票

问题是,您可能会收到多个 RecordsEvent 事件,每个事件都包含一个带有部分数据的 InputStream - 因此您必须遍历事件并加入它们:

fun selectS3ObjectContent(socRequest: SelectObjectContentRequest): InputStream {
    val handler = S3SelectObjectContentHandler()
    asyncS3.selectObjectContent(socRequest, handler).join()

    return if (handler.receivedEvents.filterIsInstance<RecordsEvent>().isEmpty())
        InputStream.nullInputStream()
    else
        handler.receivedEvents.filterIsInstance<RecordsEvent>().map {
            it.payload().asInputStream()
        }.reduce { l, r ->
            SequenceInputStream(l, r)
        }
}

class S3SelectObjectContentHandler : SelectObjectContentResponseHandler {
    private var response: SelectObjectContentResponse? = null
    var receivedEvents = ArrayList<SelectObjectContentEventStream>()
    private var exception: Throwable? = null

    override fun responseReceived(response: SelectObjectContentResponse?) {
        this.response = response
    }

    override fun onEventStream(publisher: SdkPublisher<SelectObjectContentEventStream>) {
        publisher.subscribe { event ->
            receivedEvents.add(event)
        }
    }

    override fun exceptionOccurred(throwable: Throwable?) {
        exception = throwable
    }

    override fun complete() { /* no-op */ }
}
© www.soinside.com 2019 - 2024. All rights reserved.