使用GRPC传输大量数据的最佳做法是什么?我正在向GRPC服务器发送请求,该请求将流回数据。发送回的数据可以是大约100条protobuf消息,也可以是大约100.000条protobuf消息。
service CrossEngineSelector {
rpc QueryDB (QueryRequest) returns (stream QueryResponse) {}
}
服务器是发送protobuf消息的简单实现。
@Override
public void queryBD(QueryRequest request, StreamObserver<QueryResponse> responseObserver) {
Iterables.partition( dataLoader.getData(), 1000).forEach(batch -> {
responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build());
});
responseObserver.onCompleted();
}
客户端使用blockingStub调用此方法(protobuf生成的代码):
public Iterator<QueryResponse> queryDB(QueryRequest request) {
return ClientCalls.blockingServerStreamingCall(this.getChannel(),
CrossEngineSelectorGrpc.getQueryEnginesMethod(),
this.getCallOptions(), request);
}
一旦客户端调用此方法,我将遍历QueryResponse。
对于所有只发送少量消息的流,所有这些方法都很好。一旦我尝试流式传输100.000条消息,max-inbound-message-size一直在增加,并且最终出现错误:RESOURCE_EXHAUSTED: Compressed gRPC message exceeds maximum size 4194304: 4196022 bytes read
我目前的解决方法是将max-inbound-message-size设置为非常高的+ 1Gb。这是一个硬编码的值,因此不会缩放。客户端不知道服务器将返回多少消息。我可能会遇到用例,即使1Gb max-inbound-message-size不够用。
我希望我犯了一个实现错误。我希望有一种方法可以重设服务器中每个流(onNext())的消息大小,或者正常情况下它会不断增加消息大小?
我假设单个responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build());
发送几个Mb,并且只要它运行,就将其视为消息大小,而不是整个流。
我在服务器和客户端上都使用Micronaut。
maxInboundMessageSize
用于在恶意对等方发送非常大的有效载荷进行攻击时保护接收器内存不足。真正的问题是1000个块可以大于4MB(默认为maxInboundMessageSize
)。
您可以通过两种方式解决此问题,
maxInboundMessageSize
,可以处理正常的1000行数据。后面的方法本质上就是您实现的方法。不过,它应该低于1GB +。正如我上面提到的,gRPC不会使用maxInboundMessageSize
内存量,这纯粹是为了保护。 (稍后添加)