我无法理解 Multi 生产的物品是如何处理的。根据我对 Quarkus 的理解,它们应该(最多)异步执行,因此在某些情况下,不应保留顺序。
我创建了一个小程序来测试我对 Quarkus 的了解:
@Path("/test")
@Slf4j
public class Test {
public void wait(boolean type) {
if (type)
return;
try {
sleep( 10000 );
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public Multi<Integer> generateStream(boolean[] type) {
return Multi.createFrom()
.emitter(emitter -> {
for(int i = 0; i < type.length; i++) {
emitter.emit(i);
}
emitter.complete();
}, BackPressureStrategy.IGNORE
);
}
public String intensiveComputation(Integer data, boolean[] type) {
boolean isFast = type[data];
wait(isFast);
String ret = "";
if (isFast) {
ret = data + " FAST -> DONE";
}
else {
ret = data + " SLOW -> DONE";
}
return ret + " on " + Thread.currentThread().getName() + "\n";
}
public Multi<String> quarkusTest(boolean[] type) {
return generateStream(type)
.onItem().transform(item -> this.intensiveComputation(item, type))
.onItem().invoke(item -> log.info(item));
}
@GET()
@Produces(MediaType.APPLICATION_JSON)
public Multi<String> test() {
boolean[] type = new boolean[5];
type[0] = false; // SLOW
type[1] = false; // SLOW
type[2] = false; // SLOW
type[3] = true; // FAST
type[4] = false; // SLOW
return quarkusTest(type);
}
}
当我调用
curl "http://localhost:8080/test"
时,我得到以下输出:
[0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-0
,2 SLOW -> DONE on vert.x-eventloop-thread-0
,3 FAST -> DONE on vert.x-eventloop-thread-0
,4 SLOW -> DONE on vert.x-eventloop-thread-0
]
但是,我本来期待这样的事情:
[3 FAST -> DONE on vert.x-eventloop-thread-3
,0 SLOW -> DONE on vert.x-eventloop-thread-0
,1 SLOW -> DONE on vert.x-eventloop-thread-1
,2 SLOW -> DONE on vert.x-eventloop-thread-2
,4 SLOW -> DONE on vert.x-eventloop-thread-3
]
我也尝试过
.runSubscriptionOn(executor)
,我用它得到了一些线程,但没有达到预期。
transform()
是同步的,因此它按顺序处理项目,可能会阻塞当前线程。
你应该看看
transformToUniAndMerge()
。