带有java的akka websocket,计算客户端号码,向客户端发送消息

问题描述 投票:1回答:2

我正在关注akka java websocket教程,试图创建一个websocket服务器。我想实现2个额外的功能:

  • 能够显示已连接客户端的数量,但结果始终为0或1,即使我知道我有100个并发连接的客户端。
  • Websocket通信是双向的。目前,服务器仅在客户端发送消息时响应消息。如何启动从服务器向客户端发送消息?

这是原始的akka​​ java服务器示例代码,对客户端计数实现的最小修改:

public class websocketServer {
private static AtomicInteger connections = new AtomicInteger(0);//connected clients count.

public static class MyTimerTask extends TimerTask {
//called every second to display number of connected clients.
    @Override
    public void run() {
        System.out.println("Conncurrent connections: " + connections);
    }
}


//#websocket-handling
  public static HttpResponse handleRequest(HttpRequest request) {
      HttpResponse result;
      connections.incrementAndGet();
      if (request.getUri().path().equals("/greeter")) {
          final Flow<Message, Message, NotUsed> greeterFlow = greeter();
          result = WebSocket.handleWebSocketRequestWith(request, greeterFlow);
      } else {
          result = HttpResponse.create().withStatus(413); 
      }
      connections.decrementAndGet();
      return result;
  }


public static void main(String[] args) throws Exception {
    ActorSystem system = ActorSystem.create();
    TimerTask timerTask = new MyTimerTask();
    Timer timer = new Timer(true);
    timer.scheduleAtFixedRate(timerTask, 0, 1000);
    try {
      final Materializer materializer = ActorMaterializer.create(system);

      final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request);
      CompletionStage<ServerBinding> serverBindingFuture =
        Http.get(system).bindAndHandleSync(
          handler, ConnectHttp.toHost("****", 1183), materializer);


      // will throw if binding fails
      serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
      System.out.println("Press ENTER to stop.");
      new BufferedReader(new InputStreamReader(System.in)).readLine();
      timer.cancel();
    } catch (Exception e){
        e.printStackTrace();
    }
    finally {
      system.terminate();
    }
  }

  //#websocket-handler

  /**
   * A handler that treats incoming messages as a name,
   * and responds with a greeting to that name
   */
  public static Flow<Message, Message, NotUsed> greeter() {
    return
      Flow.<Message>create()
        .collect(new JavaPartialFunction<Message, Message>() {
              @Override
              public Message apply(Message msg, boolean isCheck) throws Exception {
                if (isCheck) {
                    if (msg.isText()) {
                        return null;
                    } else {
                        throw noMatch();
                    }
                } else {
                    return handleTextMessage(msg.asTextMessage());
                }
              }
        });
  }

  public static TextMessage handleTextMessage(TextMessage msg) {
    if (msg.isStrict()) // optimization that directly creates a simple response...
    {
        return TextMessage.create("Hello " + msg.getStrictText());
    } else // ... this would suffice to handle all text messages in a streaming fashion
    {
        return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText()));
    }
  }
  //#websocket-handler
}
java concurrency websocket akka
2个回答
0
投票

解决以下2个要点:

1 - 您需要将度量标准附加到消息流 - 而不是HttpRequest流 - 以有效地计算活动连接。你可以使用watchTermination来做到这一点。下面的handleRequest方法的代码示例

public static HttpResponse handleRequest(HttpRequest request) {
  HttpResponse result;
  if (request.getUri().path().equals("/greeter")) {
    final Flow<Message, Message, NotUsed> greeterFlow = greeter().watchTermination((nu, cd) -> {
      connections.incrementAndGet();
      cd.whenComplete((done, throwable) -> connections.decrementAndGet());
      return nu;
    });
    result = WebSocket.handleWebSocketRequestWith(request, greeterFlow);
  } else {
    result = HttpResponse.create().withStatus(413);
  }
  return result;
}

2 - 对于服务器独立发送消息,您可以使用Flow.fromSinkAndSource创建其消息流。下面的例子(这只会发送一条消息):

public static Flow<Message, Message, NotUsed> greeter() {
  return Flow.fromSinkAndSource(Sink.ignore(),
    Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!"))
  );
}

0
投票

handleRequest方法中,您递增然后递减计数器connections,因此在结束时该值始终为0。

public static HttpResponse handleRequest(HttpRequest request) {
      ...
      connections.incrementAndGet();
      ...
      connections.decrementAndGet();
      return result;
  }
© www.soinside.com 2019 - 2024. All rights reserved.