我有一个主题,每100毫秒得到一个int。这个数字在每次迭代时递增。我订阅了这个主题,得到50的块,然后等待几秒钟。
SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
async x =>
{
Console.WriteLine($"1: {x[x.Count - 1]}------");
await Task.Delay(10000);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
SequenceGenerator的操作如下:
int i = 0;
while (true)
{
++i;
subj.OnNext(i.ToString());
await Task.Delay(1);
}
每当缓冲区被填满,"onnext "就会被调用。即使最后一个 "onnext "还没有完成。有什么方法可以调整这种情况吗?我希望只有在最后一个 "onnext "完成后,才会调用下一个 "onnext"。
1: 50------
(wait ~0.5s)
1: 100------
...
实际行为: 预期行为。
1: 50------
(wait 10 seconds)
1: 100------
...
有什么方法可以做到这一点吗?前面谢谢你们!最好的问候。
好吧,我想我有一个解决方案。我发现了一个小窍门 此处. Zip对我的这种做法帮助很大。我需要另一个主题,是ziped到我的流.至少这工作。
SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();
Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
.Zip(lockReleaser, (res, _) => res)
.Subscribe(async x =>
{
if (x.Count > 0)
{
Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
await Task.Delay(10000);
}
lockReleaser .OnNext(true);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);