Java 9被动流:一个用户属于一个发布者

问题描述 投票:2回答:1

我想知道如果发布者失去代理(例如关闭),发布者是否可以安全地假设订阅属于它并在其上调用java.util.concurrent.Flow.Subscriber#onComplete。下面的代码示例演示了两难问题(显然它只是一些合成代码来演示问题):

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TimePublisher implements Flow.Publisher<Long> {

    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();


    private TimePublisher() {

    }

    public static TimePublisher newInstance() {
        TimePublisher timePublisher = new TimePublisher();
        timePublisher.startTickScheduler();

        return timePublisher;
    }

    private void startTickScheduler() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // does not make too much sense: just for the sake of the example
            final long currentTimeMillis = System.currentTimeMillis();
            subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
        }, 1, 1, TimeUnit.SECONDS);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Long> subscriber) {

        subscribersList.add(subscriber);

        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                // no-op in this sample
            }

            @Override
            public void cancel() {
                subscribersList.remove(subscriber);
            }
        });
    }

    public void stop() {
        // the publisher can be stopped from the outside: after that it will
        // definitely not emit any next items.
        scheduledExecutorService.shutdown();

        // QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
        // if it is subscribed to another publisher, the following is illegal, as onNext
        // could potentially be called by another Publisher...
        subscribersList.forEach(Flow.Subscriber::onComplete);

        subscribersList.clear();
    }
}
  • 当调用TimePublisher#stop时,这个特定的发布者绝对不会发出任何onNext调用,因此调用onComplete似乎是一个合理的选择
  • 但是,如果订阅者也订阅了另一个发布者,那么调用onComplete可能是非法的,因为另一个发布者仍然可以发出项目。
java reactive-programming java-9 reactive
1个回答
3
投票

Subscriber的文档说

对于每个Flow.Subscription,将按严格的顺序调用此接口中的方法。

特别是onComplete

当已知对于尚未由错误终止的订阅不会发生其他订阅者方法调用时调用的方法,之后订阅不会调用其他订阅者方法。如果此方法抛出异常,则结果行为未定义。

因此,其他Subscriptions继续调用方法是合法的。

Flow文档说在Subscription实现中有多个Subscribers是可能的,但不建议:

因为对给定Flow.Subscription的订阅者方法调用是严格排序的,所以除非订阅者维护多个订阅,否则这些方法不需要使用锁或易失性(在这种情况下,最好定义多个订阅者,每个订阅者都有自己的订阅)。

© www.soinside.com 2019 - 2024. All rights reserved.