我在测试应用程序中使用 Quarkus 和 Mutiny。
创建如下所示的 Web 客户端,用于与其他微服务交互。
Webclient client = Webclient.create(vertex, webClientOptions);
Web 客户端用于与其他微服务交互,如下所示。这是其中的一个例子。对于所有其余的客户端交互,将使用相同的 Web 客户端。
private static final String URL ="https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";
@GET
@Path("/web")
public Uni<JsonArray> retrieveDataFromWikipedia() {
return client.getAbs(URL).send()
.onItem().transform(HttpResponse::bodyAsJsonObject)
.onItem().transform(json -> json.getJsonObject("parse")
.getJsonArray("langlinks"));
}
对于每个出站休息请求,我想在请求标头中添加日志记录相关 ID。所以我正在探索是否可以向 Web 客户端添加任何拦截器。
但我无法找到任何方法。 Web客户端没有任何拦截器方法。
有什么办法可以达到这个目的吗?
Vert.x Web 有一个内部 API (WebClientInternal) 将拦截器附加到每个请求和响应。
以下示例基于 Quarkus 3.3.0.Final(Vert.x 4.4.4、Smallrye Mutiny Vert.x 绑定 3.5.0),使用 Resteasy Reactive 扩展:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
和
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
托管依赖。
@Path("/hello")
public class ExampleResource {
// Schema changed to http on purpose to intercept a redirect response
private static final String URL = "http://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks";
// Used only to delay outgoing requests
private final Random random = new Random();
@Inject
Vertx vertx; // io.vertx.mutiny.core.Vertx;
@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<JsonArray> intercepted() {
var options = new WebClientOptions();
var client = WebClient.create(vertx, options);
// Unwrap internal API to add interceptors
var delegate = (WebClientInternal) client.getDelegate();
delegate
.addInterceptor(ExampleResource::phaseInterceptor)
.addInterceptor(this::correlationInterceptor);
return client.getAbs(URL).send()
.onItem().transform(HttpResponse::bodyAsJsonObject)
.onItem().transform(json -> json.getJsonObject("parse")
.getJsonArray("langlinks"));
}
// ...
}
请记住那些在多个阶段中多次调用的拦截器(在重定向或重试等情况下),因此必须在拦截器内部进行处理。
PhaseInterceptor 将显示不同的日志消息来演示不同的阶段:
private static void phaseInterceptor(HttpContext<?> context) {
String msg = switch (context.phase()) {
case PREPARE_REQUEST -> "Preparing request to: " + context.request().host();
case CREATE_REQUEST -> "Creating request to: " + context.request().uri();
case SEND_REQUEST -> "Sending " + context.request().method() + " request to " + context.request().host();
case FOLLOW_REDIRECT -> "Redirecting to: " + context.clientResponse().getHeader("Location");
case RECEIVE_RESPONSE -> "Got " + context.clientResponse().statusMessage() + " response";
case DISPATCH_RESPONSE ->
"Reading " + context.clientResponse().getHeader("Content-Length") + " byte(s) length response";
case FAILURE -> "Something went wrong";
};
Log.info(msg);
context.next();
}
不要忘记调用
context.next()
以继续拦截器链中的下一个调用。
调用端点时
curl --head -v http://localhost:8080/hello/intercepted
phaseInterceptor
将记录如下内容:
2023-08-27 22:35:17,304 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Preparing request to: en.wikipedia.org
2023-08-27 22:35:17,307 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,505 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
2023-08-27 22:35:17,563 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Redirecting to: https://en.wikipedia.org/w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,564 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Creating request to: /w/api.php?action=parse&page=Quarkus&format=json&prop=langlinks
2023-08-27 22:35:17,894 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Sending GET request to en.wikipedia.org
2023-08-27 22:35:18,070 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got OK response
2023-08-27 22:35:18,079 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Reading 440 byte(s) length response
Vert.x
HttpContext
对于在请求的生命周期内维护任何数据非常有用。
private void correlationInterceptor(HttpContext<?> context) {
if (context.phase() == ClientPhase.PREPARE_REQUEST) {
var correlationId = UUID.randomUUID();
var delay = random.nextLong(500);
Log.infof("Request of id [%s] delayed %4d milliseconds", correlationId, delay);
vertx.setTimer(delay, l -> {
// Set unique identifier as a context attribute
context.set("CorrelationId", correlationId);
// (Optional) set header value if it needed
context.request().putHeader("X-Correlation-Id", correlationId.toString());
context.next();
});
} else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
// Read shared correlationId from context data
var correlationId = context.get("CorrelationId");
// (Optional) set header value if it needed
context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
Log.infof("Got response for correlation id: [%s]", correlationId);
context.next();
} else {
context.next();
}
}
2023-08-27 23:01:10,278 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea8d37fe-19a8-431c-91a3-553a2987f800] delayed 104 milliseconds
2023-08-27 23:01:10,719 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea8d37fe-19a8-431c-91a3-553a2987f800]
2023-08-27 23:01:10,725 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7d920c38-ccba-4f27-a95e-594c23ac2fa2] delayed 73 milliseconds
2023-08-27 23:01:10,725 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [0862f366-a6a4-4327-8f54-a6c98437db68] delayed 53 milliseconds
2023-08-27 23:01:10,727 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Request of id [ea79effa-b225-441c-a9fc-d359fb4e597f] delayed 210 milliseconds
2023-08-27 23:01:10,727 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Request of id [7a8197da-4ea3-4002-a95f-a389841596fc] delayed 204 milliseconds
2023-08-27 23:01:11,112 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [0862f366-a6a4-4327-8f54-a6c98437db68]
2023-08-27 23:01:11,153 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7d920c38-ccba-4f27-a95e-594c23ac2fa2]
2023-08-27 23:01:11,257 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-0) Got response for correlation id: [7a8197da-4ea3-4002-a95f-a389841596fc]
2023-08-27 23:01:11,298 INFO [io.git.zfo.ExampleResource] (vert.x-eventloop-thread-1) Got response for correlation id: [ea79effa-b225-441c-a9fc-d359fb4e597f]
最后这是
correlationInterceptor
的简化版本,毫不拖延。
private void correlationInterceptor(HttpContext<?> context) {
if (context.phase() == ClientPhase.PREPARE_REQUEST) {
var correlationId = UUID.randomUUID();
context.set("CorrelationId", correlationId);
context.request().putHeader("X-Correlation-Id", correlationId.toString());
} else if (context.phase() == ClientPhase.RECEIVE_RESPONSE) {
var correlationId = context.get("CorrelationId");
context.clientResponse().headers().add("X-Correlation-Id", correlationId.toString());
Log.infof("Got response for correlation id: [%s]", correlationId);
}
context.next();
}