我需要从服务器检索无限量的数据。这应该采取以下方式:
我怎么能用observables做到这一点?
现在我只能想到来自同一函数内的递归可观察调用。
const send = execSend() {
this.send(message).subscribe(resp => {
if (resp === 'end') {
subscriber.next(byteArr.join(''));
console.log('finished');
subscriber.complete();
} else {
byteArr.push(resp);
execSend();
}
});
}();
以下内容应该复制您的想法:在完成之前发出1个事件。
import { of } from 'rxjs';
import { expand, takeWhile, reduce } from 'rxjs/operators';
let count = 0;
const FINISH = "finished";
const limit = 5;
const send$ = () => of(count++ < limit ? "sent" : FINISH);
const expander$ = send$().pipe(
expand(resp => send$()),
takeWhile(resp => resp !== FINISH),
reduce((acc, val) => acc ? acc + val : val, null)
);
const subscribe = expander$.subscribe(console.log);
你可以看到它在this blitz工作
没有测试,但你可以尝试这种模式
exec=()=>http.get(....)
exec().pipe(
expand((resp)=>exec()),
takeWhile(resp=>resp !== 'end'),
scan((acc,curr)=>acc.concat(curr),[])
).subscribe()
这样的事情:
let todo = true;
interval(100).pipe(
takeWhile(()=>todo),
concatMap(()=>getStuff())
).subscribe(data => {
todo = !isFinished(data);
});