我有三个工作类型的课程,如下所示。
interface Work{
boolean execute();
}
class Service1 implements Work{
@Override
boolean execute(){
//some complex logic
return true;
}
}
class Service2 implements Work{
@Override
boolean execute(){
//some complex logic
return true;
}
}
class Service3 implements Work{
@Override
boolean execute(){
//some complex logic
return true;
}
}
我想将列表中的每一个作为通量进行流式传输,并且仅当当前元素的执行方法返回 true 时才执行下一个服务。
Service1 service1 = new Service1();
Service2 service2 = new Service2();
Service3 service3 = new Service3();
List<Work> serviceList = List.of(service1, service2, service3);
Flux<Work> services = Flux.fromStream(serviceList.stream());
//?? how to process the next element based on current element's result
我可以通过像下面这样链接 flatMap 来使用 Mono 来做到这一点,但我想使用 Flux 来实现这一点。
Mono<Work> service1Mono = Mono.just(service1);
Mono<Work> service2Mono = Mono.just(service2);
Mono<Work> service3Mono = Mono.just(service3);
Mono<Work> workResult = service1Mono.flatMap(a->{
if(a.execute())
return service2Mono;
return Mono.just(a);
}).flatMap(b -> {
if(b.execute())
return service3Mono;
return Mono.just(b);
});
//do something with the workResult
更新: 提供针对我的用例的更多信息。我正在尝试实现一个简单的传奇协调器模式。假设我有三门课
public interface Work {
Mono<WorkResult> execute();
Mono<WorkResult> compensate();
}
public class SomeWork implements Work{
@Override
public Mono<WorkResult> execute() {
WorkResult result = new WorkResult();
result.setName("SomeWork");
result.setValid(true);
return Mono.just(result);
}
@Override
public Mono<WorkResult> compensate() {
return null;
}
}
public class SomeOtherWork implements Work{
@Override
public Mono<WorkResult> execute() {
WorkResult result = new WorkResult();
result.setName("SomeOtherWork");
result.setValid(true);
return Mono.just(result);
}
@Override
public Mono<WorkResult> compensate() {
return null;
}
}
public class SomeMoreWork implements Work{
@Override
public Mono<WorkResult> execute() {
WorkResult result = new WorkResult();
result.setName("SomeMoreWork");
result.setValid(false);
return Mono.just(result);
}
@Override
public Mono<WorkResult> compensate() {
return null;
}
}
我的工作成果是:
public class WorkResult {
private Boolean isValid;
private String name;
public Boolean getValid() {
return isValid;
}
public void setValid(Boolean valid) {
isValid = valid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
这是我的协调器:
public class OrchestratorEngine {
public void start(List<Work> workList){
System.out.println("In orchestrator engine..");
WorkResult workResult = new WorkResult();
workResult.setValid(true);
workResult.setName("DummyWork");
Mono<WorkResult> response = Mono.just(workResult);
for(Work work: workList){
response = response.flatMap(result -> {
if(result.getValid()) {
System.out.println(result.getName());
return work.execute();
}
return Mono.just(result);
});
}
response = response.flatMap(result->{
if(result.getValid()){
System.out.println(result.getName());
}
return Mono.just(result);
});
response.subscribe(result->{
//compensating events start
});
}
}
我想我找到了实现这一目标的方法。我希望这不会以任何方式阻碍。这是代码。
Service1 service1 = new Service1();
Service2 service2 = new Service2();
Service3 service3 = new Service3();
Work initialWork = new Work() {
@Override
public boolean execute() {
return true;
}
};
Mono<Work> response = Mono.just(initialWork);
Mono<Work> service1Mono = Mono.just(service1);
Mono<Work> service2Mono = Mono.just(service2);
Mono<Work> service3Mono = Mono.just(service3);
List<Mono<Work>> workList = List.of(service1Mono, service2Mono, service3Mono);
for(Mono<Work> work: workList){
response = response.flatMap(result -> {
if(result.execute())
return work;
return Mono.just(result);
});
}
//process the final service3Mono
response = response.flatMap(result -> {
System.out.println(result.execute());
return Mono.just(result);
});
response.subscribe();
最初,我创建了一些默认为 true 的虚拟工作。有了这个结果,我将迭代列表中的每个 Mono,执行一个 flatMap,然后仅当当前 Mono 返回 true 时才返回下一个 Mono。最后我将触发最后一个 Mono,以防万一它到达那个点。然后,订阅整个回复。