如何在 onNext 之前等待订阅者完成?

问题描述 投票:0回答:1

我对反应式编程非常陌生,并且我陷入了困境。

我尝试实现如果我的集合达到一定数量的元素或特定时间已经过去我需要触发一些方法。

使用目的:我将处理来自数据库的事件,但我需要在每 200 个事件后标记我停止的位置,或者每 10 秒保存一次最后处理的事件,因为可能不会有一个事件达到 200 个事件很久。但 OnNext 不等待完成 saveCheckPoint 方法并继续接受已处理的项目。为了数据的一致性,我需要在 OnNext 之前等待。

要模拟我的设计,您可以检查此代码,我怎样才能实现我的要求?

Subject<string> events = new Subject<string>();

events
    .Buffer(TimeSpan.FromSeconds(10), 200)
    .SelectMany(async x => await SaveCheckpoint(x)).Subscribe();


async Task<bool> SaveCheckpoint(IList<string> i)
{
    var lastProcessedEvent = i.LastOrDefault();

    if (lastProcessedEvent != null)
    {
        //Save checkpoint to db or somewhere else
        await Task.Delay(1000);
        Console.WriteLine($"Saved checkpoint of : {lastProcessedEvent}");
    }
   
    return await Task.FromResult(true);
}


for (int j = 1; j < 10000; j++)
{
    Console.WriteLine("Event processed");
    // This do not wait subscriber to finish batch
    events.OnNext($"Event - {j}");
}

我希望找到一种方法在 onNext 之前等待 Subscribe 方法完成

c# .net .net-6.0 reactive-programming system.reactive
1个回答
0
投票

对于我来说,你需要在没有异步/等待执行的情况下执行此操作,如下所示:

    events
        .Buffer(TimeSpan.FromSeconds(10), 200)
        .Do(x =>SaveCheckpoint(x).Wait())
        .Subscribe();
© www.soinside.com 2019 - 2024. All rights reserved.