RxJS 如果达到最大批量大小或在超过指定时间段内没有新输入,则使用 subject,bufferSize,takeuntil 进行 API 调用

问题描述 投票:0回答:2

我正在开发一个角度应用程序。

我不想对每条记录进行 API 调用,而是想在一定时间段后,当没有新项目添加到队列(可观察队列)时对一批项目进行 API 调用。

在我的例子中,如果有新项目添加到可观察的batchChangesQueue中超过5秒,那么我想对批次中所有累积的项目执行API调用。

但是,使用我尝试的代码,我总是得到单个项目而不是项目数组。 我尝试了使用 switchmap、concatmap、toArray 等的各种代码示例

switchMap(() => this.batchChangesQueue.pipe(toArray()))

concatMap(() => this.batchChangesQueue.pipe(toArray()))

还使用了

timer(5000).pipe(take(1))
takeUntil(timer(5000))

但似乎没有任何效果,任何人都可以建议我的代码中做错了什么,或者是否有一种更简单的方法来实现我想要实现的目标?

我的服务类有以下代码。

changeListener$ 正在使用 .next() 从不同的组件调用

@Injectable({
  providedIn: 'root'
})
export class EventPublisher {
  private changeListener$ = new Subject < any > ();
  private batchChangesQueue = new Subject < unknown > ();
  private bufferSize = 3;

  constructor() {
    this.processBatch();
    this.subscribeToDataChanges();
  }
  private subscribeToDataChanges() {
    this.changeListener$.subscribe(async(data: {
      name: string,
      changes: any
    }) => {
      await this.saveChanges(data.name, this.convertToString(data.changes));
    });
  }
  public async processBatch() {
    this.batchChangesQueue.pipe(
      debounceTime(5000),
      switchMap(() => this.batchChangesQueue.pipe(buffer(this.batchCellChangesQueue)))
    ).subscribe({
      next: batch => {
        console.log('Processing batch:', batch);
        // Perform API calls here
      },
      error: err => console.error('Error processing batch:', err),
    });
  }
  public async saveChanges(sheetName: string, cellChanges: any) {
    this.batchChangesQueue.next({
      name,
      cellChanges
    });
  }
}

angular typescript rxjs rxjs-observables rxjs-pipeable-operators
2个回答
0
投票

让我们首先隔离问题并重新创建一个最小的重现。

首先,让我们构建一个流来模仿您的

batchChangesQueue$

我们将创建一个函数来从给定的时间间隔获取随机整数,然后通过发出一个随机值来构建流,该随机值表示该值发出之前的毫秒数(100 毫秒到 2000 毫秒之间):

function randomIntFromInterval(min: number, max: number) {
  // min and max included
  return Math.floor(Math.random() * (max - min + 1) + min);
}

const batchChangesQueue$ = defer(() => {
  const value = randomIntFromInterval(100, 2000);
  return of(value).pipe(delay(value));
}).pipe(
  tap((value) => console.log(`Queue emitted value ${value}`)),
  repeat()
);

然后,让我们构建一个小模拟来处理您提到的批次

Perform API calls here

// simulate what could be an HTTP call to a service
function processBatch(values: number[]) {
  console.log(`Processing values at once: ${values}`);
  return of(true).pipe(
    delay(1000),
    tap(() => console.log(`Values processed`))
  );
}

现在,真正的代码来处理对项目进行排队并将它们运行到返回可观察值的流程批处理函数中:

batchChangesQueue$
  .pipe(
    bufferTime(5000),
    concatMap((values) => processBatch(values))
  )

以下是可能输出的示例:

Queue emitted value 1438
Queue emitted value 729
Queue emitted value 361
Queue emitted value 952
Processing values at once: 1438,729,361,952
Queue emitted value 558
Queue emitted value 631
Values processed
Queue emitted value 566
Queue emitted value 726
Queue emitted value 1271
Processing values at once: 558,631,566,726,1271
Queue emitted value 670
Values processed
Queue emitted value 1923
Queue emitted value 887
Queue emitted value 1248
Queue emitted value 329
Queue emitted value 402
Processing values at once: 670,1923,887,1248,329,402
Values processed

Stackblitz 现场演示


0
投票

我认为您可以简单地使用

bufferTime
运算符来指定时间和缓冲区限制。

bufferTime(5000, null, 3), // emits array every 5000ms or when 3 items collected

实现可能看起来像这样:

  private changeListener$ = new Subject<{name: string, changes: any}>();
  private changesQueue = new Subject<{sheetName: string, cellChanges: any}>();
  private bufferSize = 3;
  private bufferInterval = 5000;

  private batchChangesQueue = this.changesQueue.pipe(
    bufferTime(this.bufferInterval, null, this.bufferSize),
    filter(batch => !!batch.length) // Ignore empty batches
  );

  public processBatch() {
    this.batchChangesQueue.subscribe({
      next: batch => { console.log('Processing batch:', batch); /* Perform API calls here */ },
      error:  err => console.error('Error processing batch:', err),
    });
  }

注意:看起来您可以简化为使用单个主题,因为您订阅一个主题只是为了将结果发送到另一个主题。

© www.soinside.com 2019 - 2024. All rights reserved.