Reactor spring mongodb存储库将多个结果组合在一起

问题描述 投票:0回答:1

我是反应式编程的新手,目前正在研究基于Spring Webflux的应用程序。我陷入了几个问题。

public class FooServiceImpl {

@Autowired
private FooDao fooDao;

@Autowired
private AService aService;

@Autowired
private BService bService;

public long calculateSomething(long fooId) {
    Foo foo = fooDao.findById(fooId); // Blocking call one

    if (foo == null) {
        foo = new Foo();
    }

    Long bCount = bService.getCountBByFooId(fooId); // Blocking call two
    AEntity aEntity = aService.getAByFooId(fooId);  // Blocking call three

    // Do some calculation using foo, bCount and aEntity
    // ...
    // ...

    return someResult;
}
}

这是我们编写使用三个外部API调用结果的阻塞代码的方式(我们认为是DB调用)。我正在努力将其转换为响应代码,如果所有三个都变成单声道,并且如果我订阅了这三个,则外部订户将被阻止吗?

public Mono<Long> calculateSomething(long fooId) {
    return Mono.create(sink -> {
        Mono<Foo> monoFoo = fooDao.findById(fooId); // Reactive call one
        monoFoo.subscribe(foo -> {
            if (foo == null) {
                foo = new Foo();
            }

            Mono<Long> monoCount = bService.getCountBByFooId(fooId);  // Reactive call two

            monoCount.subscribe(aLong -> {
                Mono<AEntity> monoA = aService.getAByFooId(fooId);  // Reactive call three
                monoA.subscribe(aEntity -> {
                    //...
                    //...
                    sink.success(someResult);
                });
            });
        });
    };
  }

我看到有一个名为zip的函数,但是它仅适用于两个结果,所以这里有一种方法可以应用它?

如果我们在create方法中订阅某些内容,还会发生什么,会阻塞线程吗?

如果能帮助我,将非常感谢。

reactive-programming spring-webflux project-reactor reactor reactor-netty
1个回答
0
投票

如果您为我提供了要使用这些值进行的计算,对我来说,显示反应堆的实现方法会更容易。但是,假设您要从数据库中读取一个值,然后将该值用于另一件事。使用平面图并制作一个独特的Flux,从而减少了代码行和复杂性,而无需使用其他人所说的subscription()。示例:

return fooDao.findById(fooId)
.flatmap(foo -> bService.getCountBByFooId(foo))
.flatmap(bCount -> aService.getAByFooId(fooId).getCount()+bCount);
© www.soinside.com 2019 - 2024. All rights reserved.