使用Spring Reactive Elasticsearch客户端获取错误响应的主体。

问题描述 投票:0回答:1

我正在使用Spring Data Elasticsearch来获取一些聚合,我试图使用 ReactiveElasticsearchClient. 我从Elasticsearch得到了一个500错误,但是不知道如何得到响应的主体,这样我就可以调试我的请求有什么问题。以下是我目前所掌握的情况。

    final Flux<Aggregation> resp = client.aggregate(request -> {
        final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.queryStringQuery(query)).size(0)
                .aggregation(AggregationBuilders.composite("report", compositeValueBuilders));

        request.indices(index).source(sourceBuilder);

    });

    resp.doOnError(throwable -> {
        throwable.printStackTrace();
    }).log().doOnEach(signal -> {
        if (signal.hasError()) {
            signal.getThrowable().printStackTrace();
        } else {
            final Aggregation agg = signal.get();
            System.out.println(agg.getType());
            System.out.println(agg.getClass());
            System.out.println(agg.getName());
        }
    }).blockLast();

启用了org.springframework.data.elasticsearch.client.wire的跟踪日志,请求主体被记录下来,但响应的日志行是这样的:

2020-06-09 14:12:46.625 TRACE 19999 --- [or-http-epoll-1] o.s.data.elasticsearch.client.wire : [4ed5a037] 接收到原始响应。500 INTERNAL_SERVER_ERROR.

我得到以下堆栈跟踪。我可以看到 ElasticsearchStatusException但我看不出有什么可以做的事。Signal 对象来获取响应体。

org.elasticsearch.ElasticsearchStatusException: POST request to /<index name>/_search returned error code 500.
    at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.handleServerError(DefaultReactiveElasticsearchClient.java:809) ~[spring-data-elasticsearch-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:760) ~[spring-data-elasticsearch-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.lambda$sendRequest$22(DefaultReactiveElasticsearchClient.java:680) ~[spring-data-elasticsearch-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:156) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:143) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:182) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onNext(FluxRetryPredicate.java:82) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:156) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
    at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:428) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:514) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onStateChange(PooledConnectionProvider.java:536) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at reactor.netty.resources.PooledConnectionProvider$PooledConnection.onStateChange(PooledConnectionProvider.java:427) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:562) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311) ~[netty-codec-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:425) ~[netty-codec-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-codec-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.49.Final-linux-x86_64.jar:4.1.49.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.49.Final-linux-x86_64.jar:4.1.49.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.49.Final-linux-x86_64.jar:4.1.49.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.49.Final.jar:4.1.49.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.49.Final.jar:4.1.49.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Tl;dr:当使用Spring ReactiveElasticsearchClient?

spring-data-elasticsearch
1个回答
1
投票

你应该 启用记录 通过设置记录级别的

org.springframework.data.elasticsearch.client.WIRE

踪迹 级。这就记录了发送和接收的内容,尤其是在错误的情况下(摘自执行)。

private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
        Class<T> responseType) {

    // ...

    if (response.statusCode().is5xxServerError()) {

        ClientLogger.logRawResponse(logId, response.statusCode());
        return handleServerError(request, response);
    }

    if (response.statusCode().is4xxClientError()) {

        ClientLogger.logRawResponse(logId, response.statusCode());
        return handleClientError(logId, request, response, responseType);
    }

    // ...
}

编辑:

对不起,是我的错,我应该看到响应体没有被记录下来.

你可以做的是使用一个拦截代理,监控你的应用和Elasticsearch之间的流量。有这样的 OWASP ZAP代理Burp Suite社区版. 这两个应用程序都是在8080端口上以代理身份启动的,所以你需要将你的应用程序配置为在不同的端口上监听 (server.port 的属性,我的测试程序使用的是9090)或者更改代理端口。除此之外,你还需要在你的应用中配置代理,我的配置如下:当访问你的应用时,访问Elasticsearch的流量会通过代理,在那里你可以看到请求和响应。

@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
            .connectedTo("localhost:9200") //
            .withProxy("localhost:8080")
            .build();
        return ReactiveRestClients.create(clientConfiguration);

    }
}

当访问你的应用时,访问Elasticsearch的流量会通过代理服务器,你可以看到请求和响应。

© www.soinside.com 2019 - 2024. All rights reserved.