我在一个可观察的范围内设置HTTP请求之间的间隔有问题。事情是-我希望每个http请求之间都有延迟,等待所有请求完成,并对所有请求中的组合数据执行操作。当然,文件数组的长度是未知的。
示例代码:
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document); // in real world http.post()
}
const documentsToSave$ = zip(
documents,
interval(200),
document => {
document['someDataToBeInserted'] = {'data': 123};
return saveDocumentService(document);
}
);
const sub = forkJoin(documentsToSave$).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
使用这种方法,仅输出最后一个值。
感谢。
您可以将merge
与计时器一起使用,而忽略该计时器的输出:
import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'
const documents = [
{
'documentId': 1,
'documentName': 'DocName1'
},
{
'documentId': 2,
'documentName': 'DocName2'
},
];
function saveDocumentService(document) {
document['someAnotherData'] = '123';
return of(document)
}
from(documents)
.pipe(
concatMap(url => saveDocumentService(url).pipe(
tap(res => console.log('Saved document...')),
merge(timer(1000).pipe(ignoreElements()))
)),
toArray(),
)
.subscribe(documents => {
console.log('Sub:', documents)
})
您可以使用rxjs
pipe
运算符将take
和interval
组合为
counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];
ngOnInit() {
interval(2000)
.pipe(take(this.items.length))
.subscribe(res => {
console.log(console.log(this.items[this.counter++]));
});
}
编辑:您需要使用Observable.create
创建一个可观察流,您可以在固定间隔创建可观察物时使用setInterval
,最后可以将其标记为已完成。
一旦订阅,一旦发出新值并且将可观察对象标记为完成,我们将执行单独的方法
obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];
ngOnInit() {
this.obs = Observable.create(observer => {
let intervalID = setInterval(() => {
observer.next(this.items[this.counter++]);
if (this.counter >= this.items.length) {
clearInterval(intervalID);
observer.complete();
}
}, 1000);
});
this.obs.subscribe(
res => {
console.log(res);
},
err => {
console.log(`Error: ${err}`);
},
() => {
console.log("complete");
}
);
}
在https://stackblitz.com/edit/angular-regular-interval-observables-with-complete的Stackblitz
尝试使用combineLatest
。但是请注意,直到每个可观察对象发出至少一个值之前,CombineLatest才会发出初始值。您可以查看文档以获取更多信息:combineLatest
因为您具有documentsToSave$
类型的Observable<Observable<any>>
,所以将forkJoin
替换为mergeAll
和toArray
const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
console.log('All documents uploaded', documents); // array of responses
});
}