如何通过Flux用不同的调度程序运行两个任务

问题描述 投票:0回答:1

我尝试使用两种策略(串行和并行)来存储和解析并存储一些原始数据

    Flux<PanasonicData> f = Flux.create(sink -> dataRepo.addConsumer(sink::next));
    Flux.from(f).publishOn(Schedulers.single()).subscribe(this::save1);
    Flux.from(f).publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

    ConnectableFlux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish();
    cf.autoConnect().publishOn(Schedulers.single()).subscribe(this::save1);
    cf.autoConnect().publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

但是第二项任务从未执行!!!如何使用这两种不同的策略来运行这两项任务?

spring flux reactor
1个回答
0
投票

您可以通过autoConnect(int minSubscribers)指定最小订户数:

Flux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish().autoConnect(2);
cf.publishOn(Schedulers.single()).subscribe(this::save1);
cf.publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);
© www.soinside.com 2019 - 2024. All rights reserved.