我想知道如果发布者失去代理(例如关闭),发布者是否可以安全地假设订阅属于它并在其上调用java.util.concurrent.Flow.Subscriber#onComplete
。下面的代码示例演示了两难问题(显然它只是一些合成代码来演示问题):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimePublisher implements Flow.Publisher<Long> {
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();
private TimePublisher() {
}
public static TimePublisher newInstance() {
TimePublisher timePublisher = new TimePublisher();
timePublisher.startTickScheduler();
return timePublisher;
}
private void startTickScheduler() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
// does not make too much sense: just for the sake of the example
final long currentTimeMillis = System.currentTimeMillis();
subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
subscribersList.add(subscriber);
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
// no-op in this sample
}
@Override
public void cancel() {
subscribersList.remove(subscriber);
}
});
}
public void stop() {
// the publisher can be stopped from the outside: after that it will
// definitely not emit any next items.
scheduledExecutorService.shutdown();
// QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
// if it is subscribed to another publisher, the following is illegal, as onNext
// could potentially be called by another Publisher...
subscribersList.forEach(Flow.Subscriber::onComplete);
subscribersList.clear();
}
}
TimePublisher#stop
时,这个特定的发布者绝对不会发出任何onNext
调用,因此调用onComplete
似乎是一个合理的选择onComplete
可能是非法的,因为另一个发布者仍然可以发出项目。Subscriber
的文档说
对于每个Flow.Subscription,将按严格的顺序调用此接口中的方法。
特别是onComplete
:
当已知对于尚未由错误终止的订阅不会发生其他订阅者方法调用时调用的方法,之后订阅不会调用其他订阅者方法。如果此方法抛出异常,则结果行为未定义。
因此,其他Subscription
s继续调用方法是合法的。
Flow
文档说在Subscription
实现中有多个Subscriber
s是可能的,但不建议:
因为对给定Flow.Subscription的订阅者方法调用是严格排序的,所以除非订阅者维护多个订阅,否则这些方法不需要使用锁或易失性(在这种情况下,最好定义多个订阅者,每个订阅者都有自己的订阅)。