ReplaySubject
的 Rx 可观察量,但仅发出累积序列once 并且仅针对第一个订阅者(当该订阅者连接时)?第一次订阅后,它应该像普通订阅一样 Subject
。
这适用于 .NET 项目,但我同样感谢 JavaScript/RxJS 答案。
我在谷歌上搜索了潜在的解决方案,我即将推出自己的解决方案,类似于我的方法
DistinctSubject
,最终。
我稍微修改了类似问题中的实现,并将类的名称从
ReplayOnceSubject
更改为ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
else
return _subject.Subscribe(observer);
}
}
}
这可能不是最有效的解决方案,因为每次操作都会获取两个不同的
lock
(_locker
和 internal _gate
),但它
应该也不会很糟糕。
我已将您的逻辑移植到打字稿中。由于 javascript 和 Nodejs 是单线程的,并且没有
async
操作,因此我不需要 .NET 代码所需的锁定机制。
import * as rxjs from 'rxjs';
/**
* A custom `ReplaySubject` that only replays the buffered events
* to the first subscriber. All subsequent subscribers will only
* receive new events, as if subscribed to a `Subject`.
* https://stackoverflow.com/a/69390202/470818
*/
export class ReplayFirstSubscriberOnlySubject<T>
implements rxjs.SubjectLike<T>
{
/**
* Buffers events and replays them to the first subscriber.
* Continues to emit new events.
*/
private replaySubject: rxjs.Subject<T>;
/**
* Subject used for all subsequent subscribers.
* They only receive new emits, not buffered events before subscribing.
*/
private subject?: rxjs.Subject<T>;
constructor() {
this.replaySubject = new rxjs.ReplaySubject<T>();
}
public next(value: T): void {
// Notify the first subscriber of the value.
this.replaySubject.next(value);
// Notify all subsequent subscribers of the value.
this.subject?.next(value);
}
public error(error: Error): void {
// Notify the first subscriber of the error.
this.replaySubject.error(error);
// Notify all subsequent subscribers of the error.
this.subject?.error(error);
}
public complete(): void {
// Notify the first subscriber of the completion.
this.replaySubject.complete();
// Notify all subsequent subscribers of the completion.
this.subject?.complete();
}
public subscribe(observer: Partial<rxjs.Observer<T>>): rxjs.Unsubscribable {
const subscription = (this.subject ?? this.replaySubject).subscribe(
observer
);
if (!this.subject) {
this.subject = new rxjs.Subject<T>();
}
return subscription;
}
}
describe('ReplayFirstSubscriberOnlySubject', () => {
it('should replay buffered events to the first subscriber only', () => {
const subscriber1NextSpy = jest.fn();
const subscriber2NextSpy = jest.fn();
const subscriber1CompleteSpy = jest.fn();
const subscriber2CompleteSpy = jest.fn();
const subject = new ReplayFirstSubscriberOnlySubject<string>();
// buffer events for the first subscriber
subject.next('A');
subject.next('B');
// first subscriber
subject.subscribe({
next: subscriber1NextSpy,
complete: subscriber1CompleteSpy,
});
// new event to all current subscribers
subject.next('C');
// second subscriber
subject.subscribe({
next: subscriber2NextSpy,
complete: subscriber2CompleteSpy,
});
// new event to all current subscribers
subject.next('D');
// done
subject.complete();
// First subscriber receives all buffered and new events.
expect(subscriber1NextSpy).toHaveBeenCalledTimes(4);
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(1, 'A');
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(2, 'B');
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(3, 'C');
expect(subscriber1NextSpy).toHaveBeenNthCalledWith(4, 'D');
expect(subscriber1CompleteSpy).toHaveBeenCalledTimes(1);
// Subsequent subscribers only receive new events.
expect(subscriber2NextSpy).toHaveBeenCalledTimes(1);
expect(subscriber2NextSpy).toHaveBeenNthCalledWith(1, 'D');
expect(subscriber2CompleteSpy).toHaveBeenCalledTimes(1);
});
});