一个 Rx 可观察对象将充当 ReplaySubject,但仅适用于第一个订阅者?

问题描述 投票:0回答:2

有什么优雅的方式来编写类似

ReplaySubject
的 Rx 可观察量,但仅发出累积序列once 并且仅针对第一个订阅者(当该订阅者连接时)?第一次订阅后,它应该像普通订阅一样
Subject

这适用于 .NET 项目,但我同样感谢 JavaScript/RxJS 答案。

我在谷歌上搜索了潜在的解决方案,我即将推出自己的解决方案,类似于我的方法

DistinctSubject
,最终。

javascript c# rxjs reactive-programming system.reactive
2个回答
2
投票

我稍微修改了类似问题中的实现,并将类的名称从

ReplayOnceSubject
更改为
ReplayFirstSubscriberOnlySubject

public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
    private readonly object _locker = new object();
    private ISubject<T> _subject = new ReplaySubject<T>();

    public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
    public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
    public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException(nameof(observer));
        lock (_locker)
        {
            if (_subject is ReplaySubject<T> replaySubject)
            {
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                replaySubject.Dispose();
                _subject = subject;
                return subscription;
            }
            else
                return _subject.Subscribe(observer);
        }
    }
}

这可能不是最有效的解决方案,因为每次操作都会获取两个不同的

lock
_locker
internal
_gate
),但它 应该也不会很糟糕。


0
投票

谢谢您Theodor回答

我已将您的逻辑移植到打字稿中。由于 javascript 和 Nodejs 是单线程的,并且没有

async
操作,因此我不需要 .NET 代码所需的锁定机制。


代码

import * as rxjs from 'rxjs';

/**
 * A custom `ReplaySubject` that only replays the buffered events
 * to the first subscriber. All subsequent subscribers will only
 * receive new events, as if subscribed to a `Subject`.
 * https://stackoverflow.com/a/69390202/470818
 */
export class ReplayFirstSubscriberOnlySubject<T>
  implements rxjs.SubjectLike<T>
{
  /**
   * Buffers events and replays them to the first subscriber.
   * Continues to emit new events.
   */
  private replaySubject: rxjs.Subject<T>;

  /**
   * Subject used for all subsequent subscribers.
   * They only receive new emits, not buffered events before subscribing.
   */
  private subject?: rxjs.Subject<T>;

  constructor() {
    this.replaySubject = new rxjs.ReplaySubject<T>();
  }

  public next(value: T): void {
    // Notify the first subscriber of the value.
    this.replaySubject.next(value);
    // Notify all subsequent subscribers of the value.
    this.subject?.next(value);
  }

  public error(error: Error): void {
    // Notify the first subscriber of the error.
    this.replaySubject.error(error);
    // Notify all subsequent subscribers of the error.
    this.subject?.error(error);
  }

  public complete(): void {
    // Notify the first subscriber of the completion.
    this.replaySubject.complete();
    // Notify all subsequent subscribers of the completion.
    this.subject?.complete();
  }

  public subscribe(observer: Partial<rxjs.Observer<T>>): rxjs.Unsubscribable {
    const subscription = (this.subject ?? this.replaySubject).subscribe(
      observer
    );
    if (!this.subject) {
      this.subject = new rxjs.Subject<T>();
    }
    return subscription;
  }
}

开玩笑测试

describe('ReplayFirstSubscriberOnlySubject', () => {
  it('should replay buffered events to the first subscriber only', () => {
    const subscriber1NextSpy = jest.fn();
    const subscriber2NextSpy = jest.fn();

    const subscriber1CompleteSpy = jest.fn();
    const subscriber2CompleteSpy = jest.fn();

    const subject = new ReplayFirstSubscriberOnlySubject<string>();

    // buffer events for the first subscriber
    subject.next('A');
    subject.next('B');

    // first subscriber
    subject.subscribe({
      next: subscriber1NextSpy,
      complete: subscriber1CompleteSpy,
    });

    // new event to all current subscribers
    subject.next('C');

    // second subscriber
    subject.subscribe({
      next: subscriber2NextSpy,
      complete: subscriber2CompleteSpy,
    });

    // new event to all current subscribers
    subject.next('D');

    // done
    subject.complete();

    // First subscriber receives all buffered and new events.
    expect(subscriber1NextSpy).toHaveBeenCalledTimes(4);
    expect(subscriber1NextSpy).toHaveBeenNthCalledWith(1, 'A');
    expect(subscriber1NextSpy).toHaveBeenNthCalledWith(2, 'B');
    expect(subscriber1NextSpy).toHaveBeenNthCalledWith(3, 'C');
    expect(subscriber1NextSpy).toHaveBeenNthCalledWith(4, 'D');
    expect(subscriber1CompleteSpy).toHaveBeenCalledTimes(1);

    // Subsequent subscribers only receive new events.
    expect(subscriber2NextSpy).toHaveBeenCalledTimes(1);
    expect(subscriber2NextSpy).toHaveBeenNthCalledWith(1, 'D');
    expect(subscriber2CompleteSpy).toHaveBeenCalledTimes(1);
  });
});
© www.soinside.com 2019 - 2024. All rights reserved.