处理来自发布者的同等消耗的数据

问题描述 投票:0回答:1

如何处理从Publisher并行订阅的数据?

  1. 我是否应该在一批工人中循环订阅?当我在Project Reactor中呼叫订阅时,我只得到了一块数据。如何“全部沥干”?
  2. 如何确保每个工作人员将获取不同的数据块?
java rx-java project-reactor reactive-streams
1个回答
0
投票

将每个数据块转换为任务并将其提交给Executor。转换器可能如下所示:

class Converter implements Subscriber<T> {
    final Executor executor;
    Subscription subscription;

    Converter(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        s.request(1);
    }

    @Override
    public void onNext(T data) {
        executor.execute(()->process(data));
        subscription.request(1);
    }
    ...        
    void process(T o) {
        ...
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.