RXJS如何延迟每个可观察的(http请求)并组合所有请求的输出

问题描述 投票:1回答:4

我在一个可观察的范围内设置HTTP请求之间的间隔有问题。事情是-我希望每个http请求之间都有延迟,等待所有请求完成,并对所有请求中的组合数据执行操作。当然,文件数组的长度是未知的。

示例代码:

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';

  return of(document); // in real world http.post()
}

const documentsToSave$ = zip(
  documents,
  interval(200),
  document => {
    document['someDataToBeInserted'] = {'data': 123};
    return saveDocumentService(document);
  }
);

const sub = forkJoin(documentsToSave$).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});

使用这种方法,仅输出最后一个值。

感谢。

angular rxjs observable rxjs6
4个回答
1
投票

您可以将merge与计时器一起使用,而忽略该计时器的输出:

import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';
  return of(document)
}
from(documents)
  .pipe(
    concatMap(url => saveDocumentService(url).pipe(
      tap(res => console.log('Saved document...')),
      merge(timer(1000).pipe(ignoreElements()))
    )),
    toArray(),
  )
  .subscribe(documents => {
    console.log('Sub:', documents)
  })

Stackblitz


0
投票

您可以使用rxjs pipe运算符将takeinterval组合为

counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];

  ngOnInit() {
    interval(2000)
      .pipe(take(this.items.length))
      .subscribe(res => {
        console.log(console.log(this.items[this.counter++]));
      });
  }

编辑:您需要使用Observable.create创建一个可观察流,您可以在固定间隔创建可观察物时使用setInterval,最后可以将其标记为已完成。

一旦订阅,一旦发出新值并且将可观察对象标记为完成,我们将执行单独的方法

obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];



ngOnInit() {
    this.obs = Observable.create(observer => {
      let intervalID = setInterval(() => {
        observer.next(this.items[this.counter++]);
        if (this.counter >= this.items.length) {
          clearInterval(intervalID);
          observer.complete();
        }
      }, 1000);
    });

    this.obs.subscribe(
      res => {
        console.log(res);
      },
      err => {
        console.log(`Error: ${err}`);
      },
      () => {
        console.log("complete");
      }
    );
  }

https://stackblitz.com/edit/angular-regular-interval-observables-with-complete的Stackblitz


0
投票

尝试使用combineLatest。但是请注意,直到每个可观察对象发出至少一个值之前,CombineLatest才会发出初始值。您可以查看文档以获取更多信息:combineLatest


0
投票

因为您具有documentsToSave$类型的Observable<Observable<any>>,所以将forkJoin替换为mergeAlltoArray

const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});
  }

here is working example

© www.soinside.com 2019 - 2024. All rights reserved.