Spring Boot GRPC:ServerIntereceptor读取请求中的数据,并在响应中进行设置

问题描述 投票:0回答:1

[GRPC服务附带的每个请求原型中都存在一个称为“元数据”的字段(请勿与GRPC元数据混淆):

message MyRequest {
  RequestResponseMetadata metadata = 1;
  ...
}

并且所有回复中也存在相同的字段:

message MyResponse {
  RequestResponseMetadata metadata = 1;
  ...
}

我正在尝试编写ServerInterceptor(或其他方法,如果它可以工作的话),以从请求中读取“元数据”字段,将其保留在某个地方,然后在完成请求后将其设置在响应中。

尝试1:ThreadLocal

public class ServerInterceptor implements io.grpc.ServerInterceptor {

  private ThreadLocal<RequestResponseMetadata> metadataThreadLocal = new ThreadLocal<>();

  @Override
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT, RespT> call,
      final Metadata requestHeaders,
      ServerCallHandler<ReqT, RespT> next) {
    return new SimpleForwardingServerCallListener<ReqT>(
        next.startCall(
            new SimpleForwardingServerCall<ReqT, RespT>(call) {
              @Override
              public void sendMessage(RespT message) {
                super.sendMessage(
                    (RespT)
                        MetadataUtils.setMetadata(
                            (GeneratedMessageV3) message, metadataThreadLocal.get()));
                metadataThreadLocal.remove();
              }
            },
            requestHeaders)) {
      @Override
      public void onMessage(ReqT request) {
        // todo nava see if ReqT can extend GenericV3Message
        var metadata = MetadataUtils.getMetadata((GeneratedMessageV3) request);
        metadataThreadLocal.set(metadata);
        super.onMessage(request);
      }
    };
  }
}

我尝试使用ThreadLocal,以便稍后realise不需要sendMessageonMessage处于同一线程上。

尝试2:GRPC上下文

public class ServerInterceptor implements io.grpc.ServerInterceptor {

  public static final Context.Key<RequestResponseMetadata> METADATA_KEY = Context.key("metadata");

  @Override
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT, RespT> call,
      final Metadata requestHeaders,
      ServerCallHandler<ReqT, RespT> next) {
    return new SimpleForwardingServerCallListener<ReqT>(
        next.startCall(
            new SimpleForwardingServerCall<ReqT, RespT>(call) {
              @Override
              public void sendMessage(RespT message) {
                super.sendMessage(
                    (RespT)
                        MetadataUtils.setMetadata(
                            (GeneratedMessageV3) message, METADATA_KEY.get()));
              }
            },
            requestHeaders)) {
      @Override
      public void onMessage(ReqT request) {
        var metadata = MetadataUtils.getMetadata((GeneratedMessageV3) request);
        var newContext = Context.current().withValue(METADATA_KEY, metadata);
        oldContext = newContext.attach();
        super.onMessage(request);
      }
    };
  }
}

我计划将上下文分离到onComplete()中,但是在它到达那里之前,METADATA_KEY.get()中的sendMessage返回null,而我希望它返回数据。

甚至在点击sendMessage()功能之前,我都在控制台中得到了这个信息,表明我做错了事情:

3289640 [grpc-default-executor-0] ERROR i.g.ThreadLocalContextStorage - Context was not attached when detaching
java.lang.Throwable: null
    at io.grpc.ThreadLocalContextStorage.detach(ThreadLocalContextStorage.java:48)
    at io.grpc.Context.detach(Context.java:421)
    at io.grpc.Context$CancellableContext.detach(Context.java:761)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:39)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

如何在收到请求时读取数据,将其存储在某处并在发送回响应时使用它?

multithreading grpc interceptor thread-local grpc-java
1个回答
0
投票

您可以使用Metadata将值从请求传递到响应:

public class MetadataServerInterceptor implements ServerInterceptor {

    public static final Metadata.Key<byte[]> METADATA_KEY = Metadata.Key.of("metadata-bin", Metadata.BINARY_BYTE_MARSHALLER);

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        var serverCall = new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
            @Override
            public void sendMessage(RespT message) {
                byte[] metadata = headers.get(METADATA_KEY);
                message = (RespT) MetadataUtils.setMetadata((GeneratedMessageV3) message, metadata);
                super.sendMessage(message);
            }
        };
        ServerCall.Listener<ReqT> listenerWithContext = Contexts.interceptCall(Context.current(), serverCall, headers, next);
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(listenerWithContext) {
            @Override
            public void onMessage(ReqT message) {
                byte[] metadata = MetadataUtils.getMetadata((GeneratedMessageV3) message);
                headers.put(METADATA_KEY, metadata);
                super.onMessage(message);
            }
        };
    }
}

注:由于无法将RequestResponseMetadata的实例放入元数据中(至少在没有实现自定义编组器的情况下),因此可以将其保存为字节数组。您可以在toByteArray()对象上使用RequestResponseMetadata来获取byte[],并使用RequestResponseMetadata.#parseFrom(byte[])byte[]中获取对象。

© www.soinside.com 2019 - 2024. All rights reserved.