RxJS 将数组映射到可观察对象并返回到数组中的普通对象

问题描述 投票:0回答:1

我有一个对象数组,我需要将每个对象分别传递到异步方法中(后面的过程用 Promise 处理,然后通过

Observable.fromPromise(...)
转换回 Observable - 这种方式是需要的,因为在情况下使用相同的方法任何时候都可以传递单个对象;该过程正在将对象保存到数据库中)。例如,这是一个对象数组:

[
  {
    "name": "John",
    ...
  },
  {
    "name": "Anna",
    ...
  },
  {
    "name": "Joe",,
    ...
  },
  {
    "name": "Alexandra",
    ...
  },
  ...
]

现在我有一个名为 insert

which
的方法,它将对象插入数据库。数据库实例中的
store
方法返回新创建的 id。最后,复制初始对象并使用其新 id 进行映射:

insert(user: User): Observable<User> {
  return Observable.fromPromise(this.database.store(user)).map(
    id => {
      let storedUser = Object.assign({}, user);
      storedUser.id = id;
      return storedUser;
    }
  );
}

如果我插入单个对象,这很有效。但是,我想添加对插入多个对象的支持,这些对象只需调用单个插入的方法。目前这是我所拥有的,但它不起作用:

insertAll(users: User[]): Observable<User[]> {
  return Observable.forkJoin(
    users.map(user => this.insert(user))
  );
}

insertAll
方法正在按预期插入用户(或者用其他用户填充数据库),但我没有得到任何响应。我正在调试正在发生的事情,似乎
forkJoin
仅从第一个映射的用户那里得到响应,但其他人被忽略。订阅
insertAll
不会执行任何操作,通过
insertAll
上的 catch 或通过订阅
insertAll
中的第二个参数也没有任何错误。

因此,我正在寻找一种解决方案,其中 Observable(在

insertAll
中)会以这种形式发回一组新对象:

[
  {
    "id": 1,
    "name": "John",
    ...
  },
  {
    "id": 2,
    "name": "Anna",
    ...
  },
  {
    "id": 3,
    "name": "Joe",,
    ...
  },
  {
    "id": 4,
    "name": "Alexandra",
    ...
  },
  ...
]

如何解决这个问题?

rxjs observable rxjs5 angular2-observables fork-join
1个回答
9
投票

要将数组转换为可观察的,您可以使用

Rx.Observable.from(array)

要将可观察量转换为数组,请使用

obs.toArray()
。请注意,这确实返回了一个数组的可观察值,因此您仍然需要
.subscribe(arr => ...)
才能将其取出。

也就是说,您的代码

forkJoin
does 看起来是正确的。但如果你确实想尝试
from
,请编写这样的代码:

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user))
    .toArray();
}

另一种更像 rx 的方法是在值完成时发出它们,而不是像

forkJoin
toArray
那样等待所有值。我们可以省略前面示例中的
toArray
,我们得到了它:

insertAll(users: User[]): Observable<User> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user));
}

正如@cartant提到的,问题可能不在Rx中,可能是你的数据库不支持多个连接。在这种情况下,您可以将

mergeMap
替换为
concatMap
,使 Rx 仅发送 1 个并发请求:

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .concatMap(user => this.insert(user))
    .toArray(); // still optional
}
© www.soinside.com 2019 - 2024. All rights reserved.