我有一个Flowable
,例如:
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
.doOnSubscribe(sub -> {
System.out.println("onSubscribe");
})
.onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
.buffer(1, TimeUnit.SECONDS)
.doOnTerminate(() -> {
System.out.println("onTerminate");
})
.onErrorReturn(error -> {
System.out.println("Error, will cancel scan: " + error);
throw new RuntimeException(error);
})
.subscribe(objs -> /* NIO work here */);
[当我在源发射器上调用onComplete
时,紧接在onNext
调用之后,我在subscribe
lambda中收到中断。我认为这是预期的行为:https://github.com/ReactiveX/RxJava/issues/3601。这似乎是在订户处理缓冲区中的下游对象时发生的(我想如果它是单线程的就不会有问题)。
这是一个问题,因为我在这里执行NIO工作,这对Thread.interrupt
发生了非常具体的要求。这是在缓冲区完成发送所有工作之前发生的,因此某些objs
未被完全处理。
这和Scheduler
有关吗?我应该使用IO之一吗?如何“保护”在订户中执行的工作
.onSubscribe(Schedulers.io(), false)
.subscribe(objs -> /* NIO work here */);