一个Reactor-Netty HttpClient正在向一个HttpServer发送一个请求,即HttpClientRequest。在服务器端,我无法从HttpServerRequest中获取请求对象。我可以从HttpServerRequest中检索到FluxByteBuf,但无法检索到Flux对象中的ByteBuf对象。通常情况下,订阅Flux可以让我获得请求对象,但在这里却无法使用。从服务器返回的响应被客户端成功接收。有谁知道为什么订阅FluxMono在Reactor-Netty客户服务器的服务器端无法工作?
客户端代码是:。
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getSimpleName());
public static void main(String[] args) throws InterruptedException {
Consumer<byte[]> onSuccess = (byte[] response) -> {
ElectionResponse electionResponse = SerializationUtils.deserialize(response);
log.info("response in onSuccess: "+electionResponse);
};
Consumer<Throwable> onError = (Throwable ex) -> {
ex.getMessage();
};
Runnable onCompletion = () -> {
System.out.println("Message Completed");
};
ElectionRequest electionRequest = new ElectionRequest("aRequest");
byte[] requestBytes = SerializationUtils.serialize(electionRequest);
ByteBuf requestByteBuf = Unpooled.copiedBuffer(requestBytes);
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
.port(61005)
.post()
.uri("/echo")
.send(Mono.just(requestByteBuf))
.responseContent()
.aggregate()
.asByteArray()
.subscribe(onSuccess, onError, onCompletion);
try {
Thread.sleep(15000);
} catch(InterruptedException ie) {
ie.printStackTrace();
}
}
}
服务器的代码是:
public class Server {
private static final Logger log = Logger.getLogger(Server.class.getSimpleName());
public static void main(String[] args) {
ElectionResponse electionResponse = new ElectionResponse("aResponse");
byte[] responseArray = SerializationUtils.serialize(electionResponse);
Consumer<ByteBuf> onSuccess = (ByteBuf request) -> {
System.out.println("onSuccess: Request received!");
};
Consumer<Throwable> onError = (Throwable ex) -> {
ex.getMessage();
};
Runnable onCompletion = () -> {
System.out.println("Message Completed");
};
DisposableServer server =
HttpServer.create()
.host("10.0.0.19")
.port(61005)
.route(routes ->
routes
.post("/echo",
(request, response) -> {
request.receive().retain().next().subscribe(onSuccess, onError, onCompletion);
return response.send(Mono.just(Unpooled.copiedBuffer(responseArray).retain()));
}
))
.bindNow();
server.onDispose()
.block();
}
}
环境和maven依赖关系如下:
Apache Maven 3.6.1
Maven home: /usr/share/maven
Java version: 11.0.6, vendor: Oracle Corporation, runtime: /home/linuxlp/opt/graalvm/graalvm-svm-linux-20.1.0-ea+28
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.3.0-51-generic", arch: "amd64", family: "unix"
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
你觉得像这样组成requestresponse怎么样?
public class Server {
private static final Logger log = Logger.getLogger(Server.class.getSimpleName());
public static void main(String[] args) {
Consumer<byte[]> onSuccess = (byte[] request) -> {
System.out.println("onSuccess: Request received!");
};
Consumer<Throwable> onError = (Throwable ex) -> {
ex.getMessage();
};
Runnable onCompletion = () -> {
System.out.println("Message Completed");
};
DisposableServer server =
HttpServer.create()
.host("10.0.0.19")
.port(61005)
.route(routes ->
routes.post("/echo",
(request, response) ->
response.send(request.receive()
.aggregate()
.asByteArray()
.doOnNext(onSuccess)
.doOnError(onError)
.doOnTerminate(onCompletion)
.flatMap(bytes -> {
ElectionRequest electionRequest = (ElectionRequest) SerializationUtils.deserialize(bytes);
ElectionResponse electionResponse = new ElectionResponse(electionRequest.getStr());
return Mono.just(Unpooled.copiedBuffer(SerializationUtils.serialize(electionResponse)));
}))))
.bindNow();
server.onDispose()
.block();
}
}