Emitter.onComplete之后缓冲的Flowable中的线程中断

问题描述 投票:0回答:1

我有一个Flowable,例如:

Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
    .doOnSubscribe(sub -> {
        System.out.println("onSubscribe");
    })
    .onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnTerminate(() -> {
        System.out.println("onTerminate");
    })
    .onErrorReturn(error -> {
        System.out.println("Error, will cancel scan: " + error);
        throw new RuntimeException(error);
    })
    .subscribe(objs -> /* NIO work here */);

[当我在源发射器上调用onComplete时,紧接在onNext调用之后,我在subscribe lambda中收到中断。我认为这是预期的行为:https://github.com/ReactiveX/RxJava/issues/3601。这似乎是在订户处理缓冲区中的下游对象时发生的(我想如果它是单线程的就不会有问题)。

这是一个问题,因为我在这里执行NIO工作,这对Thread.interrupt发生了非常具体的要求。这是在缓冲区完成发送所有工作之前发生的,因此某些objs未被完全处理。

这和Scheduler有关吗?我应该使用IO之一吗?如何“保护”在订户中执行的工作

rx-java
1个回答
0
投票
我应该在IO线程上注册我的订户:

.onSubscribe(Schedulers.io(), false) .subscribe(objs -> /* NIO work here */);

© www.soinside.com 2019 - 2024. All rights reserved.