我有一个用例,我想将Observable<M>
中的数据与Maybe<N>
中的最新值合并。带Maybe
的API是我无法控制的外部API。实际上,我想做的是Maybe
更改时从Observable
懒惰地获取值,并使用函数将它们组合。
[为简单起见,假设M
为Integer
,N
为String
。我尝试编写以下内容:
public interface SomeExternalApi {
Maybe<String> getLatestString();
}
public interface SomeInternalApi {
Observable<Integer> getIntegerObservable();
}
public class MyClass {
private final SomeExternalApi externalApi;
private final SomeInternalApi internalApi;
public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
return internalApi
.getIntegerObservable()
.withLatestFrom(
externalApi.getLatestString().toObservable(),
integerData, stringData -> new IntegerAndString(integerData, stringData)
.subscribe(observer);
}
private static class IntegerAndString {
// data class that holds onto both
}
}
我认为,这里的问题是,由Observable
生成的externalApi.getLatestString()
只是被冻结,无论其被调用时的值是多少。因此,无论从IntegerAndString
返回的原始值是什么,它都会继续发送Maybe<String>
。
是否有一种方法可以编写一个Observable
,以便在调用时可以从Maybe
懒惰地获取其数据,或者可以在这里使用其他模式?
Maybe<T>
定义为最多发射一个元素。
[我想您基本上是想重复拨打externalApi.getLatestString()
。我尚未对此进行测试,但是您可以在此处尝试使用Observable.defer(...)
:
public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
return internalApi
.getIntegerObservable()
.withLatestFrom( Observable.defer(
() -> externalApi.getLatestString().toObservable() ),
integerData, stringData -> new IntegerAndString(integerData, stringData)
.subscribe(observer));
}