RxJs无限期可观察的调用

问题描述 投票:1回答:3

我需要从服务器检索无限量的数据。这应该采取以下方式:

  1. 发送初始请求
  2. 检索一部分数据并告诉服务器一切正常,我可以获得更多数据
  3. 重复步骤2和3,直到我收到一个特定值,表示没有更多数据

我怎么能用observables做到这一点?

现在我只能想到来自同一函数内的递归可观察调用。

const send = execSend() {
    this.send(message).subscribe(resp => {
        if (resp === 'end') {
            subscriber.next(byteArr.join(''));
            console.log('finished');
            subscriber.complete();
        } else {
            byteArr.push(resp);
            execSend();
        }
    });
}();
angular typescript rxjs angular2-observables
3个回答
1
投票

以下内容应该复制您的想法:在完成之前发出1个事件。

import { of } from 'rxjs';
import { expand, takeWhile, reduce } from 'rxjs/operators';

let count = 0;
const FINISH = "finished";
const limit = 5;
const send$ = () => of(count++ < limit ? "sent" : FINISH);

const expander$ = send$().pipe(
  expand(resp => send$()),
  takeWhile(resp => resp !== FINISH),
  reduce((acc, val) => acc ? acc + val : val, null)
);

const subscribe = expander$.subscribe(console.log);

你可以看到它在this blitz工作


0
投票

没有测试,但你可以尝试这种模式

exec=()=>http.get(....) 

exec().pipe(
  expand((resp)=>exec()),
  takeWhile(resp=>resp !== 'end'),
  scan((acc,curr)=>acc.concat(curr),[])
).subscribe()

-1
投票

这样的事情:

let todo = true;
interval(100).pipe(
    takeWhile(()=>todo),
    concatMap(()=>getStuff())
).subscribe(data => {
  todo = !isFinished(data);
});
© www.soinside.com 2019 - 2024. All rights reserved.