使用反应堆即发即忘

问题描述 投票:0回答:3

我的 Spring boot 应用程序中有如下方法。

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

目前,我正在使用

@Async
注释的服务类和
doThisAsync
,但不知道如何传递
List<Data>
,因为我不想调用
block
。 我只有
Mono<List<Data>>

我的主要问题是如何单独处理这个 Mono,并且

search
方法应该返回
Flux<Data>

java spring spring-boot spring-webflux project-reactor
3个回答
27
投票

1,如果您的即发即忘已经异步返回
Mono
/
Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2,如果您的即发即忘确实阻塞了 I/O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

请注意,在上述两种情况下,您都会失去背压支持。如果

doAsyncThis
由于某种原因变慢,那么数据生产者将不会关心并继续生产项目。这是即发即弃机制的自然结果。


3
投票

您是否考虑过使用publishOn在单独的线程中运行处理,如下例所示? 这可能不完全是您所要求的,但允许您继续处理其他事务,同时由一个或多个线程(在我的示例中是四个线程)从专用调度程序(theFourThreadScheduler)完成通量结果的处理。

    @Test
    public void processingInSeparateThreadTest() {
        final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
        final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");

        theResultFlux.log()
            .collectList()
            .publishOn(theFourThreadScheduler)
            .subscribe(theStringList -> {
                doThisAsync(theStringList);
            });

        System.out.println("Subscribed to the result flux");

        for (int i = 0; i < 20; i++) {
            System.out.println("Waiting for completion: " + i);
            try {
                Thread.sleep(300);
            } catch (final InterruptedException theException) {
            }
        }
    }

    private void doThisAsync(final List<String> inStringList) {
        for (final String theString : inStringList) {
            System.out.println("Processing in doThisAsync: " + theString);
            try {
                Thread.sleep(500);
            } catch (final InterruptedException theException) {
            }
        }
    }

运行示例会产生以下输出,表明 doThisAsync() 中执行的处理是在后台执行的。

Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19

参考资料: Reactor 3 参考:调度程序


1
投票

更新2023/01/31

实际上你无论如何都应该使用 .subscribeOn() 因为即使你调用返回 Mono<Void>

fire-and-forget
函数也不能保证在该反应链中会切换执行线程或者它会立即发生(取决于“即发即忘”函数内部的代码,更具体地说,取决于链上使用的运算符)。 因此,您可能会遇到这样的情况:您的“即发即忘”函数将在调用该函数的同一线程上执行,因此您的方法将在该函数完成之前不会返回。

即发即忘函数返回

Publisher<Void>

时的情况: public Flux<Data> search(SearchRequest request) { return searchService.search(request) .collectList() .doOnNext(data -> // anyway call subscribeOn(...) fireAndForgetOperation(data) .subscribeOn(...) .subscribe() ) .flatMapMany(Flux::fromIterable); } public Mono<Void> fireAndForgetOperation(List<String> list) { ... }

即发即忘
函数只是一个常见的
void

返回方法时的情况: public Flux<Data> search(SearchRequest request) { return searchService.search(request) .collectList() .doOnNext(data -> Mono.fromRunnable(() -> fireAndForgetOperation(data)) .subscribeOn(...) .subscribe() ) .flatMapMany(Flux::fromIterable); } public void fireAndForgetOperation(List<String> list) { ... }

此外,您还应该考虑需要提供什么 

Scheduler
,具体取决于您的 
即发即忘

功能的性质。

基本上有两种情况:

1)

如果您的

即发即忘

功能确实 CPU-Bound 有效。 然后你想在 Schedulers.parallel() 中指定 subsribeOn()

2)
如果你的

fire-and-forget

函数确实 IO 工作(实际上在这种情况下无论它是阻塞还是非阻塞 IO)。 然后你想在 Schedulers.boundedElastic() 中指定 subsribeOn()

因此,使用这种方法,您将在触发“即发即忘”功能后立即返回
    

© www.soinside.com 2019 - 2024. All rights reserved.