立即可观察的不通过Observable.defer触发

问题描述 投票:3回答:1

我正在尝试使用RxJS缓存来避免不必要地重复某些HTTP调用。在尝试publishReplay时,我得到了以下代码段(受this blog post启发):

let counter = 1;
const updateRequest = Observable.defer(() => mockDataFetch())
  .publishReplay(1, 1000)
  .refCount();

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(0); // <-- delay by 0 milliseconds
}

function mockHttpCache() {
  return updateRequest
    .take(1);
}

setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 500:", val)), 500);
setTimeout(() => mockHttpCache().subscribe(val => console.log("Response 1500:", val)), 1500);

这将按预期工作并产生输出:

'Response 50:', 1
'Response 500:', 1
'Response 1500:', 2

但是,当从内部可观察对象中删除.delay(0)时,使其立即生效,在缓存持续时间过去之后,包装器不再产生任何结果。输出为:

'Response 50:', 1
'Response 500:', 1

即使再也没有缓存的项目,似乎也没有调用mockDataFetch来收集新数据。这是否是预期的行为,如果是的话,其背后的原理是什么?

rxjs
1个回答
1
投票

这是将您的代码翻译为RxJs 6.5.5,以及其他一些小的修改:

let counter = 1;
const updateRequest = defer(() => mockDataFetch())
  .pipe(
    publishReplay(1, 1000),
    refCount()
  );

function mockDataFetch() {
  console.log('RESUBSCRIBING');

  return of(counter++)
    .pipe(
      // delay(0), // <-- delay by 0 milliseconds
    );
}

function mockHttpCache() {
  return updateRequest
    .pipe(
      take(1),
    );
}

setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]')
), 50);
setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]')
), 500);
setTimeout(
  () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]')
), 1500);

StackBlitz


没有delay(0)

首先让我们看看publishReplay is implemented

publishReplay

如我们所见,由于const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined; const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler); return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>; ,它返回一个ConnectableObservable

multicast

这是multicast的样子:

const connectable: any = Object.create(source, connectableObservableDescriptor);
connectable.source = source;
connectable.subjectFactory = subjectFactory;

return <ConnectableObservable<R>> connectable;

现在,让我们仔细看看refCount,尤其是refCount方法:

// `connectable` - the `ConnectableObservable` from above
constructor(private connectable: ConnectableObservable<T>) { }

// `call` - called when the source is subscribed
// `source` - the `ConnectableObservable` from above
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  const { connectable } = this;
  (<any> connectable)._refCount++;

  const refCounter = new RefCountSubscriber(subscriber, connectable);
  const subscription = source.subscribe(refCounter);

  if (!refCounter.closed) {
    (<any> refCounter).connection = connectable.connect();
  }

  return subscription;
}

ConnectableObservable返回ConnectableObservable实例的位置。 subscribe上基本上发生的是将// Invoked as a result of `const subscription = source.subscribe(refCounter);` from `refCount` _subscribe(subscriber: Subscriber<T>) { return this.getSubject().subscribe(subscriber); } protected getSubject(): Subject<T> { const subject = this._subject; if (!subject || subject.isStopped) { this._subject = this.subjectFactory(); } return this._subject!; } 添加到subjectFactory的活动订户列表中。 ReplaySubject会跟踪订阅者的数量,当不再有订阅者时,它将在注册新订阅者(使用相同的const subscription = source.subscribe(refCounter);)实例时自动订阅源。

下一步,将调用RefCounterSubscriber

ReplaySubject执行以下操作:

RefCounterSubscriber

到达这些行时:

ReplaySubject

该来源(例如(<any> refCounter).connection = connectable.connect();)实际上将被订阅。

现在,connectable.connect()是这样实现的(大致):

connectable.connect()

这意味着将首先到达 connect(): Subscription { let connection = this._connection; if (!connection) { this._isComplete = false; connection = this._connection = new Subscription(); connection.add(this.source .subscribe(new ConnectableSubscriber(this.getSubject(), this))); if (connection.closed) { this._connection = null; connection = Subscription.EMPTY; } } return connection; } ,当它发生时,它将发出该值,然后将发送connection.add(this.source .subscribe(new ConnectableSubscriber(this.getSubject(), this))); 通知(最终通过调用mockDataFetch():]]]

of(counter)

因此,除了在链中进一步发送// In this case, `arr = [counter]` new Observable(subscriber => { for (let i = 0; i < arr.length; i++) { subscriber.next(arr[i]); } subscriber.complete(); }); 通知之外,它还将取消订阅。它最终将达到take(1)的取消订阅逻辑,但将无法正常运行,因为一切都在[同步地]进行。在正常

情况下,如果complete没有任何订阅者,则该来源将被取消订阅。 但是由于没有订阅者同时订阅源],所以会有稍微不同的行为。 Subscriber._complete()的订户列表将为空,但源

不会被取消订阅

,因为如上所述,它仍在订阅过程中。[最后的意思是将调用Subscriber._complete(),这又将导致protected _complete(): void { this.destination.complete(); this.unsubscribe(); } 收到complete通知。但是请记住,当源将被重新订阅时,将使用相同的RefCounterSubscriber

下一次再次订阅源时,将到达ReplaySubject

ReplaySubject

subscriber.complete();

这是程序的流程,没有ReplaySubject

complete

这将被记录:

ReplaySubject

带有these lines
这取决于上一节中提到的一些细节。

const refCounter = new RefCountSubscriber(subscriber, connectable); // Subscribing to a **completed** Subject // If the Subject is completed, an EMPTY subscription will be reached const subscription = source.subscribe(refCounter); if (!refCounter.closed) { // Since `closed === true`, this block won't be reached (<any> refCounter).connection = connectable.connect(); } // Returning the EMPTY subscription return subscription; 将在每个EMPTY implementation通知中安排EMPTY中的操作(默认情况下)。该动作的任务是在delay(0)通过之后发出接收到的值。它与使用setTimeout( // Source emits and the value is cached by the subject for 1 second // `take(1)` is reached // Send the value, then a `complete` notif. // But since sending a `complete` notif involves unsubscribing as well // The current subscriber will be removed from the `ReplaySubject`'s subscribers list // Then, the `ReplaySubject` will receive the `complete` notification and the subject becomes **completed** () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]') ), 50); setTimeout( // Subscribing to a **completed** subject, but because it's a `ReplaySubject` // We'd still be getting the cached values, along with a `complete` notification () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]') ), 500); setTimeout( // Since `1`'s time expired at 1 second, the `ReplaySubject` will only send a complete notification () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]') ), 1500); 基本相同,这意味着它

不会同步

但是,当使用RESUBSCRIBING Response 50: 1 complete[1] Response 500: 1 complete[2] complete[3] 时,将同步发送delay(0)通知。 delay(0)

AsyncScheduler

nexted通知最终将在队列为空时发送。但是请记住,这全部是

asynchronous,这意味着0 ms的行为将是

normally

这是程序的流程,带setTimeoutof()

输出:

complete

This is how delay deals with it为了查看delay

closed,您可以在SB项目中打开开发工具,按protected _complete() {
  // `this.queue` is populated when a `nexted` value arrives
  if (this.queue.length === 0) {
    this.destination.complete();
  }

  // Unsubscribe from the previous items from the chain
  // What's further will **not** be affected
  this.unsubscribe();
}
,键入complete并在其上放置

logpoint

第78行(例如:RefCountSubscriber):
delay(0)并且如果您注释掉最后一个setTimeout( // Subscribing to the source, which emits a value and a complete notif, synchronously // `delay` schedules an action that will do its job in 0ms(still asynchronously) // The value is emitted by the `delay`'s scheduled action // `take(1)` is reached // The value will be passed along then a `complete` notif will be sent // Then, the source will be unsubscribed // Due to `refCount`, the complete notif that came from the source // Won't reach the `ReplaySubject`. as it will already be unsubscribed from the source () => mockHttpCache().subscribe(val => console.log("Response 50:", val), null, () => console.warn('complete[1]') ), 50); setTimeout( // Since only `500ms` have passed, this subscriber will receive the cached value (`1`) // and a `complete` notification, due to `take(1)` // But since `take(1)` operates synchronously, the `RefCountSubscriber` would be closed already, so the source won't be re-subscribed (//1) () => mockHttpCache().subscribe(val => console.log("Response 500:", val), null, () => console.warn('complete[2]') ), 500); setTimeout( // `1500ms` passed, since `1000ms` the cache is empty // So the `take(1)` operator will receive nothing, meaning that the source // will be re-subscribed () => mockHttpCache().subscribe(val => console.log("Response 1500:", val), null, () => console.warn('complete[3]') ), 1500); ,您应该会看到类似这样的内容:
RESUBSCRIBING
Response 50:
1
complete[1]
Response 500:
1
complete[2]
RESUBSCRIBING
Response 1500:
2
complete[3]
        
© www.soinside.com 2019 - 2024. All rights reserved.