RxJS 每秒钟拨打电话不超过一次,但不要丢失任何电话。

问题描述 投票:0回答:1

基本上我想创建一个队列。比如说

const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))

在那里我可以打电话

queue.next({arg1, arg2, arg3})

在几个地方,有时很快就会相继出现。我不能用 throttledebounce 因为我不能失去中间的调用。我需要每个电话都被呼叫,但每秒钟不超过1次。如果两个人在一秒内开火,一个人要等一秒。如果一秒内有3个呼叫,一个要等1秒,另一个要等2秒。

rxjs system.reactive
1个回答
0
投票

你可以使用 完成 观察到的东西,并结合 联合所有.

combineAll将在前一个观测点完成后发出下一个观测点。

1. 创建您的主题

const source$$ = new Subject();

2. 提供一个函数,将你的值映射到1000毫秒后完成的Observable。

const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
  takeUntil(timer(time))
);

你不需要把函数time做成动态的,我只是把(time)=> (value)=>......因为我想写一个像throttle(1000)这样有动态时间范围的操作符。你可以直接写(value) =>Observable......如果你想要一个静态时间的话

3. 使用你的函数将你的值映射到一个有时间框的观测值上,并将所有观测值合并到concatAll中。

const result$ = source$$.pipe(
  map(timeCompletedSource$(time))
  concatAll()
);

你发现一个正在运行的stackblitz 此处

专业的。做一个自定义的操作者

const fastThrottle = (time) => (source) => source.pipe(
  map(timeCompletedSource$(1000)),
  concatAll()
)

const result$ = source$$.pipe(
  fastThrottle(1000)
);

0
投票

我最近发现自己也遇到了同样的情况,我消耗的api每秒钟只能接受4个请求。

这是我想出的办法。

一个速率限制管道

import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'

export function rateLimit<T>(
  count: number,
  slidingWindowTime: number,
  scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
  let tokens = count
  const tokenChanged = new BehaviorSubject(tokens)
  const consumeToken = () => tokenChanged.next(--tokens)
  const renewToken = () => tokenChanged.next(++tokens)
  const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime, scheduler).subscribe(renewToken)
        return value
      }),
    ),
  )
}

你可以这样使用.我想从api中获取contractIds$中的所有合同.我只想每1000ms发送4个请求。

const contracts$ = contractIds$.pipe(
  rateLimit(4, 1000),
  mergeMap(contract => this.get(contract.DocumentNumber)),
)

也许这能帮助你:)

© www.soinside.com 2019 - 2024. All rights reserved.