RXJS 的 concatAll 运算符如何确定先前的内部可观察量是否已完成?

问题描述 投票:0回答:1

目标

我试图了解 RXJS 的

concatAll()
运算符如何工作,以及它何时确定内部 RXJS
Observable
完成。

目前的认识

根据我对

concatAll()
文档的解释,
concatAll()
不会订阅发送到源的下一个可观察量直到发送到所述源的上一个可观察量已完成。

然而,在整理本地演示来测试我对操作员的理解时,我收到了意想不到的结果。

我的问题的演示

我的演示有两个“小部件”:

  • 日历小部件 (📅)
  • 聊天小部件 (💬)

首先,“应用程序”首先发布一个

LOGIN
事件来告诉小部件进行初始化。接下来,日历小部件处理
LOGIN
事件并发布
CALENDAR_HAS_BIRTHDAYS
事件。

但是,此时,我期望将

concatAll()
通过管道传输到
source
RXJS
Subject
,以防止
CALENDAR_HAS_BIRTHDAYS
到达聊天小部件。我认为由于与
LOGIN
事件关联的 Observable 尚未完成,
concatAll()
将等待向订阅者发出
CALENDAR_HAS_BIRTHDAYS
事件,直到完成为止。

下面是我的演示的代码,这是相关 Stackblitz 的链接:https://stackblitz.com/edit/kzpxzi-zsqznz?file=index.ts

import { concatAll, Observable, Subject } from 'rxjs';

const source = new Subject<Observable<string>>();
const eventBus = source.pipe(concatAll());

const publish = (event) => {
  source.next(
    new Observable((subscriber) => {
      subscriber.next(event);
      subscriber.complete();
    })
  );
};

// Calendar Widget
eventBus.subscribe({
  next: (event) => {
    if (event === 'LOGIN') {
      console.log('📅 Calendar initializing...');
      publish('CALENDAR_HAS_BIRTHDAYS');
    }
  },
});

// Chat Widget
eventBus.subscribe({
  next: (event) => {
    if (event === 'LOGIN') {
      console.log('💬 Chat initializing...');
    }

    if (event === 'CALENDAR_HAS_BIRTHDAYS') {
      console.log('💬 Chat opening birthday prompt');
    }
  },
});

// LOGIN event
publish('LOGIN');

如果您检查日志,您将看到以下内容:

📅 Calendar initializing...
💬 Chat opening birthday prompt
💬 Chat initializing...

问题

在我的演示中,我期望日志输出以下内容:

📅 Calendar initializing...
💬 Chat initializing...
💬 Chat opening birthday prompt

这就是我对

concatAll()
的理解崩溃的地方。以下是我对
concatAll()
和我的代码将一起执行的想法:

  1. publish
    通过
    LOGIN
    事件
  2. 进行调用
  3. 一个 RXJS
    Observable
    被发送到
    source
    RXJS
    Subject
    ,随后只要订阅此可观察值,它就会发出一个
    LOGIN
    字符串
  4. concatAll()
    检查是否有任何不完整的 Observable 值发送到
    source
  5. 因为
    source
    上没有不完整的 observable,所以
    concatAll()
    订阅了步骤 2 中的 observable
  6. concatAll()
    将发射值从内部可观察值中“拉出”,即
    "LOGIN"
  7. concatAll()
    向 eventBus 的订阅者发出
    LOGIN
    事件
  8. 日历小部件 (📅) 处理
    LOGIN
    事件
  9. 日历小部件 (📅) 记录“正在初始化...”
  10. 日历小部件 (📅) 通过
    publish
    事件调用
    CALENDAR_HAS_BIRTHDAYS
  11. 一个 RXJS
    Observable
    被发送到
    source
    RXJS
    Subject
    ,随后只要订阅此可观察值,它就会发出一个
    CALENDAR_HAS_BIRTHDAYS
    字符串
  12. concatAll()
    检查是否有任何不完整的 Observable 值发送到
    _eventBus

此时,我预计步骤 12 如下:

  1. 因为存在不完整的可观察量(即第 2 步中发送到
    source
    的可观察量),
    concatAll()
    订阅该可观察量

然而,

CALENDAR_HAS_BIRTHDAYS
事件was被发送给订阅者,因此“💬聊天打开生日提示”就在
📅 Calendar initializing...
日志之后记录。

我不确定

CALENDAR_HAS_BIRTHDAYS
事件是如何发出的。尽管
LOGIN
事件确实导致订阅者发布了另一个事件,但我认为 RXJS 此时应该知道与
Observable
关联的
LOGIN
尚未完成。

我对 RXJS 的理解(可能还有与 JavaScript 调用堆栈的关系)让我对演示中的日志顺序感到困惑?

javascript typescript rxjs
1个回答
0
投票

好问题!

事情是这样的:你可能会认为 rxjs 是管道,设置管道,以便信息流过并从另一端流出,但你最好将 rxjs 视为管道计划或指令,委托 rxjs 代码库根据需要(即订阅时)指定管道。

如果您这样看,那就更有意义了,因为您订阅了两次,因此当

source
发出字符串时,
concatAll
运算符会为每个订阅者运行一次

解决此问题的方法是将

source.pipe(concatAll())
替换为
source.pipe(concatAll(), share())

当您使用

share
shareReplay
时,无论下游订阅多少次,上游的所有内容都将完成一次。


¹ “一次”有以下例外情况:

  1. 如果从未订阅,则根本不会计算!
  2. 当订阅者数量从 0 变为任意时,
  3. share
    就会起作用。如果所有订阅结束,则再次订阅,
    share
    再次重新启动,以便计算上游操作。
© www.soinside.com 2019 - 2024. All rights reserved.