在WebSockets中使用RxJava的请求响应层

问题描述 投票:0回答:1

我试图在Java中的websockets之上实现一个请求->响应层。最近,我偶然发现了 RxJava,这似乎是一个不错的库。下面是我目前处理请求响应流的方法(为了可读性,省略了不重要的代码)。

public class SimpleServer extends WebSocketServer {
  Gson gson = new Gson();
  Map<String, Function<JsonObject, Void>> requests = new HashMap<>();

  private static int count = 0;

  public SimpleServer(InetSocketAddress address) {
    super(address);
  }

  @Override
  public void onMessage(WebSocket conn, String message) {
    String type = ...;
    JsonObject payload = ...;
    if (type.equals("response")) {
      Request request = requests.get(requestId).apply(payload);
    }
  }

  public Single<JsonObject> request(String action) {
    requests.put(Integer.toString(count++), response -> {
      source.onSuccess(response);
      return null;
    });
    broadcast(...);
  }
}

这是一个可行的方案,还是有更好的方法? 我在想,如果有一种方法可以使用 RxJava 两种方式,即请求会监听一个 "onMessage "观察器或类似的东西。如果能得到帮助,我将非常感激。

java rx-java java-websocket
1个回答
0
投票

你可以使用RxJava进行两种方式的通信。让我们从一个更简单的开始--接收消息。我建议你使用 BehaviorRelay 两全其美 ObserverConsumer. 你既可以监听发出的值,也可以产生值--在我们的例子中是消息。一个简单的实现可能是这样的。

public class SimpleServer extends WebSocketServer {
    private BehaviorRelay<String> receivedMessages = BehaviorRelay.create();

    public SimpleServer(InetSocketAddress address) {
        super(address);
    }

    @Override
    public void onMessage(WebSocket conn, String message) {
        receivedMessages.accept(message); // "sends" value to the relay
    }

    public Observable<String> getReceivedMessagesRx() {
        return receivedMessages.hide(); // Cast Relay to Observable
    }

    //...

现在你可以调用函数 getReceivedMessagesRx() 并订阅收到的消息。

现在是更有趣的部分--发送消息。让我们假设,你有一些Observable,产生你想发送的消息。

    // ...

    private Disposable senderDisposable = Disposables.disposed(); // (1)

    public void setMessagesSender(Observable<String> messagesToSend) { // (2)
        senderDisposable = messagesToSend.subscribe(message -> { 
            broadcast(message);
        }, throwable -> {
            // handle broadcast error 
        });
    }

    public void clear() { // (3)
        senderDisposable.dispose(); 
    }
}

这里发生了什么:

  1. 创建 Disposable 其中持有对要发送的消息的运行观察者的引用。
  2. 订阅通过的 Observable 每当你想发送一条消息时,就会发出一个函数。这个函数只能被调用一次。如果你想多次调用它,请处理之前发送者的处置,或者使用 CompositeDisposable 来存储多个处置器。
  3. 当你完成对服务器的工作后,不要忘记处置消息发送者。
© www.soinside.com 2019 - 2024. All rights reserved.