我试图在Java中的websockets之上实现一个请求->响应层。最近,我偶然发现了 RxJava
,这似乎是一个不错的库。下面是我目前处理请求响应流的方法(为了可读性,省略了不重要的代码)。
public class SimpleServer extends WebSocketServer {
Gson gson = new Gson();
Map<String, Function<JsonObject, Void>> requests = new HashMap<>();
private static int count = 0;
public SimpleServer(InetSocketAddress address) {
super(address);
}
@Override
public void onMessage(WebSocket conn, String message) {
String type = ...;
JsonObject payload = ...;
if (type.equals("response")) {
Request request = requests.get(requestId).apply(payload);
}
}
public Single<JsonObject> request(String action) {
requests.put(Integer.toString(count++), response -> {
source.onSuccess(response);
return null;
});
broadcast(...);
}
}
这是一个可行的方案,还是有更好的方法? 我在想,如果有一种方法可以使用 RxJava
两种方式,即请求会监听一个 "onMessage "观察器或类似的东西。如果能得到帮助,我将非常感激。
你可以使用RxJava进行两种方式的通信。让我们从一个更简单的开始--接收消息。我建议你使用 BehaviorRelay
两全其美 Observer
和 Consumer
. 你既可以监听发出的值,也可以产生值--在我们的例子中是消息。一个简单的实现可能是这样的。
public class SimpleServer extends WebSocketServer {
private BehaviorRelay<String> receivedMessages = BehaviorRelay.create();
public SimpleServer(InetSocketAddress address) {
super(address);
}
@Override
public void onMessage(WebSocket conn, String message) {
receivedMessages.accept(message); // "sends" value to the relay
}
public Observable<String> getReceivedMessagesRx() {
return receivedMessages.hide(); // Cast Relay to Observable
}
//...
现在你可以调用函数 getReceivedMessagesRx()
并订阅收到的消息。
现在是更有趣的部分--发送消息。让我们假设,你有一些Observable,产生你想发送的消息。
// ...
private Disposable senderDisposable = Disposables.disposed(); // (1)
public void setMessagesSender(Observable<String> messagesToSend) { // (2)
senderDisposable = messagesToSend.subscribe(message -> {
broadcast(message);
}, throwable -> {
// handle broadcast error
});
}
public void clear() { // (3)
senderDisposable.dispose();
}
}
这里发生了什么:
Disposable
其中持有对要发送的消息的运行观察者的引用。Observable
每当你想发送一条消息时,就会发出一个函数。这个函数只能被调用一次。如果你想多次调用它,请处理之前发送者的处置,或者使用 CompositeDisposable
来存储多个处置器。