RxJS:速率限制和同时限制在不同位置发出的HTTP请求

问题描述 投票:0回答:1

我正在使用外部API编写脚本,该脚本需要基于以下条件限制请求:

  • 每秒最大请求数
  • 最大当前请求数

我发现并实现了针对单个请求的这种行为,但是我需要进行以下工作流程:

  • 分页处理
  • 向同一API发出两种不同类型的请求,第二种依赖于第一个。

下面的代码段说明了请求的工作流程,我尝试了一些使用mergeMapexpand concurrent参数的事情,以及在这里发现的一些技巧,这些技巧我说对一个请求有效,但是我对此有些困惑如何跟踪所有请求以“同步”所有请求中的限制。

 * Get a page of 1000 companies and the total number of companies.
 *
 * @param afterId Optional id of the last fetched company (to get the next ones).
 */
function getCompanies(afterId?: string): Observable<{ count: number; companies: Company[] }> {
  const options = { limit: 1000, afterId }
  return this.http.post('https://example.com/searches/companies', options)
}

function getAllCompanies(): Observable<Company[]> {
  let alreadyFetchedCount = 0
  this.getCompanies().pipe(
    expand(({ count, companies }) =>
      count <= alreadyFetchedCount ? EMPTY : this.getCompanies(companies[companies.length - 1].uuid)
    ),
    tap(({ companies }) => (alreadyFetchedCount += companies.length))
  )
}

/**
 * Get a page of 1000 funding rounds.
 *
 * @param companyUuid The funding rounds company uuid.
 * @param afterId Optional id of the last fetched company (to get the next ones).
 */
function getFundingRounds(
  companyUuid: string,
  afterId?: string
): Observable<{ count: number; fundingRounds: FundingRound[] }> {
  const options = { limit: 1000, companyUuid, afterId }
  return this.http.post('https://example.com/searches/companies', options)
}

function getAllFundingRounds(companyUuid: string): Observable<FundingRound[]> {
  let alreadyFetchedCount = 0
  this.getFundingRounds().pipe(
    expand(({ count, fundingrounds }) =>
      count <= alreadyFetchedCount ? EMPTY : this.getFundingRounds(fundingrounds[fundingrounds.length - 1].uuid)
    ),
    tap(({ fundingrounds }) => (alreadyFetchedCount += fundingrounds.length)),
    reduce((acc, value) => [...acc, ...value], [])
  )
}

function main() {
  getAllCompanies().pipe(
    // Here I get a stream of 1000 or less companies until all companies have been fetched.
    // Let's work one company by one company.
    mergeMap((companies) => companies),
    // For each company, get the funding rounds and return the company extended with them.
    mergeMap((company) =>
      getAllFundingRounds(company.uuid).pipe(map((fundingRounds) => ({ ...company, fundingRounds })))
    ),
    toArray(),
    tap(companies =>
      // Do something with the result
    )
  )
}

rxjs rxjs6
1个回答
0
投票

您可以使用delay来确保您在指定时间内发出http请求,而concatMap可以帮助您按顺序进行请求

this.function1().pipe(
  mergeAll(),
  delay(1000),
  concatMap((data: any) => this.function2(data.id)),
).subscribe(
  console.log
);

function2(id: string = ''): Observable<any> {
console.log('id', id);
return serviceCAll().pipe(
  delay(1000)
 );
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.