在RxJava流中添加线程池

问题描述 投票:7回答:2

我想知道是否可以使用RxJava库,以便在以下用例中添加一些并发性:

  • 从具有自定义String(类似于ResultSet的现有Observable中依次获取ResultSetObservable.create(resultSet)列]
  • 为这些值中的每个值调用Web服务(例如,使用InvokeWebServiceFunc1<String, Pair<String, Integer>>()实例),以检索与String相关的某些统计信息(请注意,String中的Pair与一个传入的输入)
  • 以CSV格式(带有ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream))打印所有内容。

所以这就是我所拥有的:

ResultSetObservable.create(resultSet)
    .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
    .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));

它很好用,但是由于Web服务可能需要一些时间来输入String,所以我想通过具有类似映射的行为的线程池(例如10个线程)来增加一些并发性。但是我需要 ExportAsCSVAction0在同一线程中被调用(实际上当前线程是完美的)。

您能帮我吗?我不确定在这里使用toBlocking().forEach()模式是否是正确的解决方案,并且我不知道在Schedulers.from(fixedThreadPool)observeOn()中使用subscribeOn()的位置。

谢谢您的帮助!

java multithreading asynchronous concurrency rx-java
2个回答
21
投票

0
投票
© www.soinside.com 2019 - 2024. All rights reserved.