带有单个任务的异步OnNext

问题描述 投票:0回答:1

我有一个主题,每100毫秒得到一个int。这个数字在每次迭代时递增。我订阅了这个主题,得到50的块,然后等待几秒钟。

SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
    async x =>
    {
        Console.WriteLine($"1: {x[x.Count - 1]}------");
        await Task.Delay(10000);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));

SequenceGenerator的操作如下:

int i = 0;
while (true)
{
    ++i;
    subj.OnNext(i.ToString());
    await Task.Delay(1);
}

每当缓冲区被填满,"onnext "就会被调用。即使最后一个 "onnext "还没有完成。有什么方法可以调整这种情况吗?我希望只有在最后一个 "onnext "完成后,才会调用下一个 "onnext"。

1: 50------
(wait ~0.5s)
1: 100------
...

实际行为: 预期行为。

1: 50------
(wait 10 seconds)
1: 100------
...

有什么方法可以做到这一点吗?前面谢谢你们!最好的问候。

.net system.reactive reactive subject
1个回答
0
投票

好吧,我想我有一个解决方案。我发现了一个小窍门 此处. Zip对我的这种做法帮助很大。我需要另一个主题,是ziped到我的流.至少这工作。

SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();

Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
        .Zip(lockReleaser, (res, _) => res)
    .Subscribe(async x =>
    {
        if (x.Count > 0)
        {
            Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
            await Task.Delay(10000);
        }
        lockReleaser .OnNext(true);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);
© www.soinside.com 2019 - 2024. All rights reserved.