在上一个呼叫完成之前,防止订阅者被并行呼叫的正确方法是什么?
我有一种竞争条件自动取款机,代码如下
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
if(someBool)
{
await ...
Start();
}
else
{
await ...
Stop();
}
});
如果
SomeBool
变化很快,那么调用可能会像这样:
Start()
Stop()
Stop()
Start()
或更糟。我怎样才能确保它始终是
Start()
Stop()
Start()
Stop()
我可以将lock放在里面或使用某种队列来确保调用的顺序。但我希望存在这样的情况,或者我宁愿需要正确使用反应性概念,例如创建一个新的可观察对象或者谁知道会发生什么。
忘记添加 mcve。创建新的控制台应用程序,添加块:
ReactiveUI
和 ReactiveUI.Fody
。
class Program
{
static SomeReactive SomeReactive { get; } = new();
static void Main(string[] args)
{
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
if (someBool)
{
await Task.Delay((int)(Random.Shared.NextDouble() * 100));
Console.WriteLine("start");
}
else
{
await Task.Delay((int)(Random.Shared.NextDouble() * 100));
Console.WriteLine("stop");
}
});
for (int i = 0; i < 10; i++)
{
SomeReactive.SomeBool = !SomeReactive.SomeBool;
Thread.Sleep(50);
}
Console.ReadKey();
}
}
class SomeReactive : ReactiveObject
{
[Reactive]
public bool SomeBool { get; set; }
}
如果我尝试这样做,我会使用同步,如下所示:
var locker = new object();
SomeReactive.WhenAnyValue(o => o.SomeBool)
.Synchronize(locker)
.Subscribe(async someBool =>
{
if(someBool)
{
await ...
Start();
}
else
{
await ...
Stop();
}
});
希望这适用于您的情况。
我回到你的问题,我有一些希望:
SomeReactive.WhenAnyValue(o => o.SomeBool)
.Select(flag => Observable.Create<Unit>(async _ =>
{
if (flag)
{
await Task.Delay(100);
Console.WriteLine("Start - select + concat");
}
else
{
await Task.Delay(300);
Console.WriteLine("Stop - select + concat");
}
}))
.Concat()
.Subscribe();
Select
在管道中创建一个新的 Observable
- 这里,我们有一个逻辑。接下来,我们需要保留顺序/不并行运行,因此使用 Concat
。最后需要Subscribe
。
在我的调查过程中,我还用信号量制作了较少的 RX 代码。
SemaphoreSlim semaphoreSlim = new(1);
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async flag =>
{
await semaphoreSlim.WaitAsync();
try
{
if (flag)
{
await Task.Delay(100);
Console.WriteLine("Start - semaphore");
}
else
{
await Task.Delay(300);
Console.WriteLine("Stop - semaphore");
}
}
finally
{
semaphoreSlim.Release();
}
});