根据之前的元素结果对通量元素执行操作

问题描述 投票:0回答:1

我有三个工作类型的课程,如下所示。

interface Work{
        boolean execute();
    }

    class Service1 implements Work{

        @Override
        boolean execute(){
            //some complex logic
            return true;
        }
    }

    class Service2 implements Work{

        @Override
        boolean execute(){
            //some complex logic
            return true;
        }
    }

    class Service3 implements Work{

        @Override
        boolean execute(){
            //some complex logic
            return true;
        }
    }

我想将列表中的每一个作为通量进行流式传输,并且仅当当前元素的执行方法返回 true 时才执行下一个服务。

Service1 service1 = new Service1();
Service2 service2 = new Service2();
Service3 service3 = new Service3();
List<Work> serviceList = List.of(service1, service2, service3);
Flux<Work> services = Flux.fromStream(serviceList.stream());

//?? how to process the next element based on current element's result

我可以通过像下面这样链接 flatMap 来使用 Mono 来做到这一点,但我想使用 Flux 来实现这一点。

Mono<Work> service1Mono = Mono.just(service1);
Mono<Work> service2Mono = Mono.just(service2);
Mono<Work> service3Mono = Mono.just(service3);
Mono<Work> workResult = service1Mono.flatMap(a->{
  if(a.execute())
       return service2Mono;
  return Mono.just(a);
}).flatMap(b -> {
  if(b.execute())
        return service3Mono;
  return Mono.just(b);
});
//do something with the workResult

更新: 提供针对我的用例的更多信息。我正在尝试实现一个简单的传奇协调器模式。假设我有三门课

public interface Work {

    Mono<WorkResult> execute();

    Mono<WorkResult> compensate();
}

public class SomeWork implements Work{
    @Override
    public Mono<WorkResult> execute() {
        WorkResult result = new WorkResult();
        result.setName("SomeWork");
        result.setValid(true);
        return Mono.just(result);
    }

    @Override
    public Mono<WorkResult> compensate() {
        return null;
    }
}

public class SomeOtherWork implements Work{
    @Override
    public Mono<WorkResult> execute() {
        WorkResult result = new WorkResult();
        result.setName("SomeOtherWork");
        result.setValid(true);
        return Mono.just(result);
    }

    @Override
    public Mono<WorkResult> compensate() {
        return null;
    }
}

public class SomeMoreWork  implements Work{
    @Override
    public Mono<WorkResult> execute() {
        WorkResult result = new WorkResult();
        result.setName("SomeMoreWork");
        result.setValid(false);
        return Mono.just(result);
    }

    @Override
    public Mono<WorkResult> compensate() {
        return null;
    }
}

我的工作成果是:

public class WorkResult {

    private Boolean isValid;
    private String name;

    public Boolean getValid() {
        return isValid;
    }

    public void setValid(Boolean valid) {
        isValid = valid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

这是我的协调器:

public class OrchestratorEngine {

    public void start(List<Work> workList){

        System.out.println("In orchestrator engine..");
        WorkResult workResult = new WorkResult();
        workResult.setValid(true);
        workResult.setName("DummyWork");
        Mono<WorkResult> response = Mono.just(workResult);
        for(Work work: workList){
            response = response.flatMap(result -> {
                if(result.getValid()) {
                    System.out.println(result.getName());
                    return work.execute();
                }
                return Mono.just(result);
            });
        }

        response = response.flatMap(result->{
            if(result.getValid()){
                System.out.println(result.getName());
            }
            return Mono.just(result);
        });

        response.subscribe(result->{
            //compensating events start
        });
    }
}
java reactive-programming project-reactor flux
1个回答
0
投票

我想我找到了实现这一目标的方法。我希望这不会以任何方式阻碍。这是代码。

Service1 service1 = new Service1();
    Service2 service2 = new Service2();
    Service3 service3 = new Service3();

    Work initialWork = new Work() {
        @Override
        public boolean execute() {
            return true;
        }
    };
    Mono<Work> response = Mono.just(initialWork);

    Mono<Work> service1Mono = Mono.just(service1);
    Mono<Work> service2Mono = Mono.just(service2);
    Mono<Work> service3Mono = Mono.just(service3);

    List<Mono<Work>> workList = List.of(service1Mono, service2Mono, service3Mono);
    for(Mono<Work> work: workList){
        response = response.flatMap(result -> {
            if(result.execute())
                return work;
            return Mono.just(result);
        });
    }

    //process the final service3Mono
    response = response.flatMap(result -> {
        System.out.println(result.execute());
        return Mono.just(result);
    });

    response.subscribe();

最初,我创建了一些默认为 true 的虚拟工作。有了这个结果,我将迭代列表中的每个 Mono,执行一个 flatMap,然后仅当当前 Mono 返回 true 时才返回下一个 Mono。最后我将触发最后一个 Mono,以防万一它到达那个点。然后,订阅整个回复。

© www.soinside.com 2019 - 2024. All rights reserved.