Quarkus Multi<> 多线程

问题描述 投票:0回答:1

我无法理解 Multi 生产的物品是如何处理的。根据我对 Quarkus 的理解,它们应该(最多)异步执行,因此在某些情况下,不应保留顺序。

我创建了一个小程序来测试我对 Quarkus 的了解:

@Path("/test")
@Slf4j
public class Test {
    
    public void wait(boolean type) {
        if (type)
            return;
        try {
            sleep( 10000 );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public Multi<Integer> generateStream(boolean[] type) {
        return Multi.createFrom()
                .emitter(emitter -> {
                    for(int i = 0; i < type.length; i++) {
                        emitter.emit(i);
                    }
                    emitter.complete();
                }, BackPressureStrategy.IGNORE
                );
    }

    public String intensiveComputation(Integer data, boolean[] type) {
        boolean isFast = type[data];
        wait(isFast);
        String ret = "";
        if (isFast) {
            ret = data + " FAST -> DONE";
        }
        else {
            ret = data + " SLOW -> DONE";
        }
        return ret + " on " + Thread.currentThread().getName() + "\n";
    }

    public Multi<String> quarkusTest(boolean[] type) {
        return generateStream(type)
                .onItem().transform(item -> this.intensiveComputation(item, type))
                .onItem().invoke(item -> log.info(item));
    }
    
    @GET()
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<String> test() {
        boolean[] type = new boolean[5];
        type[0] = false;  // SLOW
        type[1] = false;  // SLOW
        type[2] = false;  // SLOW
        type[3] = true;   // FAST
        type[4] = false;  // SLOW
        return quarkusTest(type);
    }
}

当我调用

curl "http://localhost:8080/test"
时,我得到以下输出:

[0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-0
,2 SLOW -> DONE on vert.x-eventloop-thread-0
,3 FAST -> DONE on vert.x-eventloop-thread-0
,4 SLOW -> DONE on vert.x-eventloop-thread-0
]

但是,我本来期待这样的事情:

[3 FAST -> DONE on vert.x-eventloop-thread-3
,0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-1
,2 SLOW -> DONE on vert.x-eventloop-thread-2
,4 SLOW -> DONE on vert.x-eventloop-thread-3
]

我也尝试过

.runSubscriptionOn(executor)
,我用它得到了一些线程,但没有达到预期。

java multithreading quarkus reactive vertex
1个回答
0
投票

transform()
是同步的,因此它按顺序处理项目,可能会阻塞当前线程。

你应该看看

transformToUniAndMerge()

© www.soinside.com 2019 - 2024. All rights reserved.