RXJava顺序执行可观察的

问题描述 投票:0回答:1

我有多个函数返回Observable<String>。每个功能在文件系统上执行命令。我需要一个接一个地执行每个函数,并在可观察到的位置获得函数的输出。最后,我想要一个Observable<String>,其中包含按函数调用顺序显示的所有函数的输出

单独地,每个功能按预期工作,但我需要正确合并输出。

我像这样尝试Observable.concatArray(func1,func2,...):

    return Observable.concatArray(
        func1(),
        func2(),
        func3(), 
        func4()
    );

但是这只是保留了可观察事件的顺序。不是功能的顺序。我的意思是,如果func1发出事件A和A',而func2发出事件B和B',则我将得到A-> A'-> B-> B'。但是func2将在func1之后立即启动。这导致我出现问题,需要先完成func1才能启动func2。

第一个函数通过maven在文件系统上生成目录。因此,任务期限很长。第二,在此目录中写入文件。但是concatArray在第一个之后立即启动第二个。并且该命令失败,因为该目录目前不存在。

有没有办法避免这样的丑陋的事情:

Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1(); 
Observable<String> func2Obs = funct2(); 

func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
    func2Obs.subscribe(output -> result.onNext(output);
}
return result;
java rx-java rx-java3
1个回答
0
投票

正如建议Progman,错误不是concatArray,这是要使用的方法。问题是,在我的函数列表中,我正在使用这种代码:

public Observable<String> func1() {
    Subject<String> result = PublishSubject.create();
    String output = dosomething()
    result.onNext(output);
}

这里的问题是创建可观察对象时立即调用doSomething()函数。

解决方案是,如果需要onNext,onComplete等,请使用Observable.create()::

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.create( result -> {
        String output = dosomething()
        result.onNext(output);   
    });
}

Observable.defer(),如果您只需要等待订阅:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.defer( () -> dosomething());
}

此后您可以致电:

return Observable.concatArray(
    func1(),
    func2(),
    func3(), 
    func4()
);
© www.soinside.com 2019 - 2024. All rights reserved.