concatAll()
运算符如何工作,以及它何时确定内部 RXJS Observable
完成。
根据我对
concatAll()
文档的解释,concatAll()
将不会订阅发送到源的下一个可观察量直到发送到所述源的上一个可观察量已完成。
然而,在整理本地演示来测试我对操作员的理解时,我收到了意想不到的结果。
我的演示有两个“小部件”:
首先,“应用程序”首先发布一个
LOGIN
事件来告诉小部件进行初始化。接下来,日历小部件处理 LOGIN
事件并发布 CALENDAR_HAS_BIRTHDAYS
事件。
但是,此时,我期望将
concatAll()
通过管道传输到 source
RXJS Subject
,以防止 CALENDAR_HAS_BIRTHDAYS
到达聊天小部件。我认为由于与 LOGIN
事件关联的 Observable 尚未完成,concatAll()
将等待向订阅者发出 CALENDAR_HAS_BIRTHDAYS
事件,直到完成为止。
下面是我的演示的代码,这是相关 Stackblitz 的链接:https://stackblitz.com/edit/kzpxzi-zsqznz?file=index.ts
import { concatAll, Observable, Subject } from 'rxjs';
const source = new Subject<Observable<string>>();
const eventBus = source.pipe(concatAll());
const publish = (event) => {
source.next(
new Observable((subscriber) => {
subscriber.next(event);
subscriber.complete();
})
);
};
// Calendar Widget
eventBus.subscribe({
next: (event) => {
if (event === 'LOGIN') {
console.log('📅 Calendar initializing...');
publish('CALENDAR_HAS_BIRTHDAYS');
}
},
});
// Chat Widget
eventBus.subscribe({
next: (event) => {
if (event === 'LOGIN') {
console.log('💬 Chat initializing...');
}
if (event === 'CALENDAR_HAS_BIRTHDAYS') {
console.log('💬 Chat opening birthday prompt');
}
},
});
// LOGIN event
publish('LOGIN');
如果您检查日志,您将看到以下内容:
📅 Calendar initializing...
💬 Chat opening birthday prompt
💬 Chat initializing...
在我的演示中,我期望日志输出以下内容:
📅 Calendar initializing...
💬 Chat initializing...
💬 Chat opening birthday prompt
这就是我对
concatAll()
的理解崩溃的地方。以下是我对 concatAll()
和我的代码将一起执行的想法:
publish
通过 LOGIN
事件Observable
被发送到 source
RXJS Subject
,随后只要订阅此可观察值,它就会发出一个 LOGIN
字符串concatAll()
检查是否有任何不完整的 Observable 值发送到 source
source
上没有不完整的 observable,所以 concatAll()
订阅了步骤 2 中的 observableconcatAll()
将发射值从内部可观察值中“拉出”,即 "LOGIN"
concatAll()
向 eventBus 的订阅者发出 LOGIN
事件LOGIN
事件publish
事件调用 CALENDAR_HAS_BIRTHDAYS
Observable
被发送到 source
RXJS Subject
,随后只要订阅此可观察值,它就会发出一个 CALENDAR_HAS_BIRTHDAYS
字符串concatAll()
检查是否有任何不完整的 Observable 值发送到 _eventBus
此时,我预计步骤 12 如下:
source
的可观察量),concatAll()
不订阅该可观察量然而,
CALENDAR_HAS_BIRTHDAYS
事件was被发送给订阅者,因此“💬聊天打开生日提示”就在📅 Calendar initializing...
日志之后记录。
我不确定
CALENDAR_HAS_BIRTHDAYS
事件是如何发出的。尽管 LOGIN
事件确实导致订阅者发布了另一个事件,但我认为 RXJS 此时应该知道与 Observable
关联的 LOGIN
尚未完成。
我对 RXJS 的理解(可能还有与 JavaScript 调用堆栈的关系)让我对演示中的日志顺序感到困惑?
好问题!
事情是这样的:你可能会认为 rxjs 是管道,设置管道,以便信息流过并从另一端流出,但你最好将 rxjs 视为管道计划或指令,委托 rxjs 代码库根据需要(即订阅时)指定管道。
如果您这样看,那就更有意义了,因为您订阅了两次,因此当
source
发出字符串时,concatAll
运算符会为每个订阅者运行一次 !
解决此问题的方法是将
source.pipe(concatAll())
替换为 source.pipe(concatAll(), share())
share
或 shareReplay
时,无论下游订阅多少次,上游的所有内容都将完成一次。
¹ “一次”有以下例外情况:
share
就会起作用。如果所有订阅结束,则再次订阅,share
再次重新启动,以便计算上游操作。