如何从无限流中创建Observable

问题描述 投票:0回答:2
我正在尝试将RxJava从1升级到2。在我的旧代码中,我有如下方法:

private Observable<Integer> reversRange(int from, int to) { Stream<Integer> intStream = Stream.iterate(to, p -> p - 1); return Observable.from(() -> intStream.iterator()) .takeWhile(n -> n > from) .map(n -> n ); }

但是现在在RxJava 2中,我不能使用from。这等价于什么代码?我在this question中发现它是fromIterable,但我不知道如何在Stream中使用它。

或其他示例,这不仅应用于范围,而且应用于任何无限流。

private Observable<Integer> numbers() { Stream<Integer> intStream = Stream.iterate(0, p -> p + 1); return Observable.from(() -> intStream.iterator()); }

java java-stream rx-java rx-java2 reactive-streams
2个回答
1
投票
如果您只有StreamInteger,则只需执行以下操作:

1
投票
使用Observable.generate(() -> from, (value, emitter) -> { emitter.onNext(value); return value + 1; }); 功能:
© www.soinside.com 2019 - 2024. All rights reserved.