vertx处理程序如何工作?

问题描述 投票:0回答:2

在上周,我阅读了有关vertx的文档。我不知道vertx处理程序是如何工作的?例如

public class Client extends AbstractVerticle{

    @Override
    public void start() throws Exception {
       final HttpClient httpClient = this.vertx.createHttpClient();
       this.vertx.setPeriodic(1000, handler->{
           httpClient.getNow(8080, "localhost", "/", responseHandler -> {
                System.out.println("response");
            });
       });
    }

}

服务器是:

public class JdbcVertx extends AbstractVerticle{

    @Override
    public void start() throws Exception {
        JDBCClient client = JDBCClient.createNonShared(this.vertx, new JsonObject()
                                .put("url", "jdbc:postgresql://localhost:5432/test")
                                .put("user", "user")
                                .put("password", "password")
                                .put("driver_class", "org.postgresql.Driver")
                                .put("max_pool_size", 30));
        this.vertx.createHttpServer()
                .requestHandler(r -> {
                    client.getConnection(handler -> {                     
                        final SQLConnection connection = handler.result();
                        connection.execute(execute(), hndlr -> {
                                connection.close(closehndlr -> {                                 
                                        r.response().putHeader("content-type", "text/html").end("Response");
                                });                                   
                        });
                    });
                }).listen(8080);
    }

    private String execute(){
            return "insert into rubish (name) values ('test')";      
    }
}

(P.S我知道我首先应该检查处理程序是否成功然后做出一些操作,但是我删除了这个检查以简化代码,并且如果在30秒内没有任何响应则会从官方文档中删除处理程序中的异常)

从上面的代码中,客户端每秒发送请求并且不等待响应,但它有一个处理程序,将在响应提交时执行。

'JdbcVertx'侦听端口8080,获取请求,插入数据库,例如3秒(我将1_000_000行放入db并创建索引以减慢插入时间)然后发送响应,因此每个请求都是非阻塞的。

据我所知,vertx只有一个名为EventLoop事件循环的线程来自jdbcVertx get reqests但不立即返回响应,而是放置了一个处理程序,当db插入成功时将执行该处理程序。事件循环如何知道IO操作何时完成。我认为它使用这样的东西

if(Thread.currentThread().getState != 'blocked' && sec != 30){
    this.object.getHandler().execute();
} else if(sec == 30){
 Thread.currentThread.inerrupt();
  } else{
    sec++;
  }

但是我们只有一个线程,当我们有阻塞调用时,它没有一个线程,只有一个处理程序。

问题是,事件循环如何知道阻塞操作何时结束并且是时候执行处理程序了

java asynchronous vert.x event-loop
2个回答
0
投票

回答这个问题:

事件循环如何知道阻塞操作何时结束并且是时候执行处理程序了?

根据非阻塞模型,呼叫时的事件循环

connection.execute( execute(), hndlr )

生成一个新线程,执行你的阻塞代码,并在它完成时(类似于Thread.join())在事件循环线程中调用hndlr回调。因此,尽管可以执行阻塞代码,但主循环不会被阻止。


3
投票

但是我们只有一个线程,当我们有阻塞调用时,它没有一个线程,只有一个处理程序。它是如何工作的,如果我们可以使用处理程序,为什么我们需要使用Worker Verticle呢?

处理程序只是在收到eventbus消息或http调用时触发的操作。它们并非旨在为您处理可扩展性。如果您只使用处理程序,如果您的操作开始需要很长时间,或者您的请求数量有任何增加,您将阻止Verticle的eventloop并且会有很多Thread xxxx has been blocked警告。

要回答处理程序如何工作以及为什么事件循环不等待处理程序的结束来启动另一个处理程序,根据这个:https://vertx.io/docs/vertx-core/java/#_reactor_and_multi_reactor

每个Vertx实例都维护多个事件循环,而不是单个事件循环。默认情况下,我们根据计算机上可用核心的数量选择数字,但这可以被覆盖。

这意味着与Node.js不同,单个Vertx进程可以跨服务器进行扩展。

我们将这种模式称为多反应器模式,以区别于单螺纹反应器模式。

但是在我看来,这还不足以为你处理所有的可扩展性和线程阻塞问题,你也应该读一读:https://vertx.io/docs/vertx-core/java/#golden_rule

设计Verticle的方法有很多,但您必须尽可能保持非阻塞状态。在我看来,使用vert.x与传统的阻塞方法(例如阻止restfull端点)是不相关的。

就个人而言,我设计我的Verticle如下:

  • verticle A:暴露一个restfull端点并获取一个回调url(无论是什么动作GET / POST / PUT / PATCH / DELETE)。 Verticle总是立即响应202 Accepted而没有结果,并将eventbus中的消息发送到Verticle B.
  • vertical B:获取消息,执行操作(最终与eventbus异步调用其他顶点并等待回复)并回复调用回调url。

我会避免使用worker verticleexecuteBlocking方法,甚至创建一个线程池。我有权将我的Verticle B(在单独的pid中)的实例乘以监听相同的eventbus集群(最终是带有http反向代理的Verticle A)。我们甚至可以想象具有可变数量的Verticle B实例(在单独的pid中),具体取决于实时请求的数量。

P.S:有时我使用更强大的消息代理工具,如Apache Kafka而不是本机事件总线(当我需要尊重某种消息时,或者当我需要重放某些消息时)。

© www.soinside.com 2019 - 2024. All rights reserved.