如何使用Apache Beam(Java)进行异步Http调用?

问题描述 投票:1回答:2

输入PCollection是http请求,它是有界数据集。我想在ParDo中进行异步http调用(Java),解析响应并将结果放入输出PCollection中。我的代码如下。获得以下异常。

我无法弄清楚原因。需要指导....

java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]

码:

public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
    private static AsyncHttpClient _HttpClientAsync;
    private static ExecutorService _ExecutorService;

static{

    AsyncHttpClientConfig cg = config()
            .setKeepAlive(true)
            .setDisableHttpsEndpointIdentificationAlgorithm(true)
            .setUseInsecureTrustManager(true)
            .addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
            .build();

    _HttpClientAsync = asyncHttpClient(cg);

    _ExecutorService = Executors.newCachedThreadPool();

}


@DoFn.ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    _HttpClientAsync.prepareGet((request.getRequest()))
            .execute()
            .toCompletableFuture()
            .thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
                                                return response.getResponseBody();
                                            } return null; } )
            .thenApply(responseBody->
                    {
                        List<EnrichedPoint> resList = new ArrayList<>();
                        /*some process logic here*/
                        System.out.printf("%d enriched points back\n", result.length());
                        }
                        return resList;

                    })
            .thenAccept(resList -> {
                for (EnrichedPoint enrichedPoint : resList) {
                    c.output(enrichedPoint);
                }
            })
            .exceptionally(ex->{
                System.out.println(ex);
                return null;
            });

  }
}
apache-beam asynchttpclient
2个回答
0
投票

您的命中问题是您在processElementfinishBundle调用的上下文之外输出的问题。

你需要收集内存中的所有输出并在未来的processElement通话期间以及在finishBundle结束时通过阻止直到你的所有通话完成后输出它们。


2
投票

Scio库实现了一个处理异步操作的DoFnBaseAsyncDoFn可能为您提供所需的处理。既然你正在与CompletableFuture打交道,那也看看JavaAsyncDoFn

请注意,您一定不需要使用Scio库,但您可以采用BaseAsyncDoFn的主要思想,因为它独立于Scio库的其余部分。

© www.soinside.com 2019 - 2024. All rights reserved.