基本上我想创建一个队列。比如说
const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))
在那里我可以打电话
queue.next({arg1, arg2, arg3})
在几个地方,有时很快就会相继出现。我不能用 throttle
或 debounce
因为我不能失去中间的调用。我需要每个电话都被呼叫,但每秒钟不超过1次。如果两个人在一秒内开火,一个人要等一秒。如果一秒内有3个呼叫,一个要等1秒,另一个要等2秒。
combineAll将在前一个观测点完成后发出下一个观测点。
1. 创建您的主题
const source$$ = new Subject();
2. 提供一个函数,将你的值映射到1000毫秒后完成的Observable。
const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
takeUntil(timer(time))
);
你不需要把函数time做成动态的,我只是把(time)=> (value)=>......因为我想写一个像throttle(1000)这样有动态时间范围的操作符。你可以直接写(value) =>Observable......如果你想要一个静态时间的话
3. 使用你的函数将你的值映射到一个有时间框的观测值上,并将所有观测值合并到concatAll中。
const result$ = source$$.pipe(
map(timeCompletedSource$(time))
concatAll()
);
你发现一个正在运行的stackblitz 此处
专业的。做一个自定义的操作者
const fastThrottle = (time) => (source) => source.pipe(
map(timeCompletedSource$(1000)),
concatAll()
)
const result$ = source$$.pipe(
fastThrottle(1000)
);
我最近发现自己也遇到了同样的情况,我消耗的api每秒钟只能接受4个请求。
这是我想出的办法。
一个速率限制管道
import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'
export function rateLimit<T>(
count: number,
slidingWindowTime: number,
scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
let tokens = count
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return mergeMap<T, Observable<T>>((value: T) =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(slidingWindowTime, scheduler).subscribe(renewToken)
return value
}),
),
)
}
你可以这样使用.我想从api中获取contractIds$中的所有合同.我只想每1000ms发送4个请求。
const contracts$ = contractIds$.pipe(
rateLimit(4, 1000),
mergeMap(contract => this.get(contract.DocumentNumber)),
)
也许这能帮助你:)