quarkus mutiny Web 客户端中的自定义拦截器

问题描述 投票:0回答:1

我在测试应用程序中使用 Quarkus 和 Mutiny。

创建如下所示的 Web 客户端,用于与其他微服务交互。

https://smallrye.io/smallrye-mutiny-vertx-bindings/2.16.2/apidocs/io/vertx/mutiny/ext/web/client/WebClient.html#create(io.vertx.mutiny.core. Vertx,io.vertx.ext.web.client.WebClientOptions)

Webclient client = Webclient.create(vertex, webClientOptions);

Web 客户端用于与其他微服务交互,如下所示。这是其中的一个例子。对于所有其余的客户端交互,将使用相同的 Web 客户端。

private static final String URL ="https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";
        
@GET
@Path("/web")
public Uni<JsonArray> retrieveDataFromWikipedia() {                     
    return client.getAbs(URL).send()                                    
                 .onItem().transform(HttpResponse::bodyAsJsonObject)         
                 .onItem().transform(json -> json.getJsonObject("parse")     
                                                .getJsonArray("langlinks"));
}

对于每个出站休息请求,我想在请求标头中添加日志记录相关 ID。所以我正在探索是否可以向 Web 客户端添加任何拦截器。 但我无法找到任何方法。 Web客户端没有任何拦截器方法。
有什么办法可以达到这个目的吗?

web quarkus interceptor vert.x mutiny
1个回答
0
投票

Vert.x Web 有一个内部 API (WebClientInternal) 将拦截器附加到每个请求和响应。

以下示例基于 Quarkus 3.3.0.Final(Vert.x 4.4.4、Smallrye Mutiny Vert.x 绑定 3.5.0),使用 Resteasy Reactive 扩展:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>

托管依赖。

第 1 阶段 - 附加拦截器

@Path("/hello")
public class ExampleResource {

    // Schema changed to http on purpose to intercept a redirect response
    private static final String URL = "http://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";

    // Used only to delay outgoing requests
    private final Random random = new Random();

    @Inject
    Vertx vertx; // io.vertx.mutiny.core.Vertx;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<JsonArray> intercepted() {

        var options = new WebClientOptions();
        var client = WebClient.create(vertx, options);

        // Unwrap internal API to add interceptors
        var delegate = (WebClientInternal) client.getDelegate();
        delegate
                .addInterceptor(ExampleResource::phaseInterceptor)
                .addInterceptor(this::correlationInterceptor);

        return client.getAbs(URL).send()
                .onItem().transform(HttpResponse::bodyAsJsonObject)
                .onItem().transform(json -> json.getJsonObject("parse")
                        .getJsonArray("langlinks"));
    }

// ...
}

请记住那些在多个阶段中多次调用的拦截器(在重定向或重试等情况下),因此必须在拦截器内部进行处理。

PhaseInterceptor 将显示不同的日志消息来演示不同的阶段:

private static void phaseInterceptor(HttpContext<?> context) {

    String msg = switch (context.phase()) {
        case PREPARE_REQUEST -> "Preparing request to: " + context.request().host();
        case CREATE_REQUEST -> "Creating request to: " + context.request().uri();
        case SEND_REQUEST -> "Sending " + context.request().method() + " request to " + context.request().host();
        case FOLLOW_REDIRECT -> "Redirecting to: " + context.clientResponse().getHeader("Location");
        case RECEIVE_RESPONSE -> "Got " + context.clientResponse().statusMessage() + " response";
        case DISPATCH_RESPONSE ->
                "Reading " + context.clientResponse().getHeader("Content-Length") + " byte(s) length response";
        case FAILURE -> "Something went wrong";
    };
    Log.info(msg);
    context.next();
}

不要忘记调用

context.next()
以继续拦截器链中的下一个调用。

调用端点时

curl --head -v http://localhost:8080/hello/intercepted

phaseInterceptor
将记录如下内容:

2023-08-27 22:35:17,304 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Preparing request to: en.wikipedia.org
2023-08-27 22:35:17,307 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,505 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
2023-08-27 22:35:17,563 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Redirecting to: https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,564 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,894 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
2023-08-27 22:35:18,070 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got OK response
2023-08-27 22:35:18,079 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Reading 440 byte(s) length response

第 2 阶段关联请求和响应

Vert.x

HttpContext
对于在请求的生命周期内维护任何数据非常有用。

private void correlationInterceptor(HttpContext<?> context) {
    if (context.phase() == ClientPhase.PREPARE_REQUEST) {
        var correlationId = UUID.randomUUID();
        var delay = random.nextLong(500);
        Log.infof("Request of id [%s] delayed %4d milliseconds", correlationId, delay);
        vertx.setTimer(delay, l -> {
            // Set unique identifier as a context attribute
            context.set("CorrelationId", correlationId);
            // (Optional) set header value if it needed
            context.request().putHeader("X-Correlation-Id", correlationId.toString());
            context.next();
        });
    } else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
        // Read shared correlationId from context data
        var correlationId = context.get("CorrelationId");
        // (Optional) set header value if it needed
        context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
        Log.infof("Got response for correlation id: [%s]", correlationId);
        context.next();
    } else {
        context.next();
    }
}
2023-08-27 23:01:10,278 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea8d37fe-19a8-431c-91a3-553a2987f800] delayed  104 milliseconds
2023-08-27 23:01:10,719 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea8d37fe-19a8-431c-91a3-553a2987f800]
2023-08-27 23:01:10,725 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7d920c38-ccba-4f27-a95e-594c23ac2fa2] delayed   73 milliseconds
2023-08-27 23:01:10,725 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [0862f366-a6a4-4327-8f54-a6c98437db68] delayed   53 milliseconds
2023-08-27 23:01:10,727 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea79effa-b225-441c-a9fc-d359fb4e597f] delayed  210 milliseconds
2023-08-27 23:01:10,727 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7a8197da-4ea3-4002-a95f-a389841596fc] delayed  204 milliseconds
2023-08-27 23:01:11,112 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [0862f366-a6a4-4327-8f54-a6c98437db68]
2023-08-27 23:01:11,153 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7d920c38-ccba-4f27-a95e-594c23ac2fa2]
2023-08-27 23:01:11,257 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7a8197da-4ea3-4002-a95f-a389841596fc]
2023-08-27 23:01:11,298 INFO  [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea79effa-b225-441c-a9fc-d359fb4e597f]

最后这是

correlationInterceptor
的简化版本,毫不拖延。

private void correlationInterceptor(HttpContext<?> context) {
    if (context.phase() == ClientPhase.PREPARE_REQUEST) {
        var correlationId = UUID.randomUUID();
        context.set("CorrelationId", correlationId);
        context.request().putHeader("X-Correlation-Id", correlationId.toString());
    } else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
        var correlationId = context.get("CorrelationId");
        context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
        Log.infof("Got response for correlation id: [%s]", correlationId);
    }
    context.next();
}
© www.soinside.com 2019 - 2024. All rights reserved.