如何对包含异步代码的 WhenAnyValue 订阅者调用进行排队?

问题描述 投票:0回答:2

在上一个呼叫完成之前,防止订阅者被并行呼叫的正确方法是什么?

我有一种竞争条件自动取款机,代码如下

SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
    if(someBool)
    {
        await ...
        Start();
    }
    else
    {
        await ...
        Stop();
    }
});

如果

SomeBool
变化很快,那么调用可能会像这样:

Start()
Stop()
Stop()
Start()

或更糟。我怎样才能确保它始终是

Start()
Stop()
Start()
Stop()

我可以将lock放在里面或使用某种队列来确保调用的顺序。但我希望存在这样的情况,或者我宁愿需要正确使用反应性概念,例如创建一个新的可观察对象或者谁知道会发生什么。


忘记添加 mcve。创建新的控制台应用程序,添加块:

ReactiveUI
ReactiveUI.Fody

class Program
{
    static SomeReactive SomeReactive { get; } = new();

    static void Main(string[] args)
    {
        SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
        {
            if (someBool)
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("start");
            }
            else
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("stop");
            }
        });

        for (int i = 0; i < 10; i++)
        {
            SomeReactive.SomeBool = !SomeReactive.SomeBool;
            Thread.Sleep(50);
        }

        Console.ReadKey();
    }
}

class SomeReactive : ReactiveObject
{
    [Reactive]
    public bool SomeBool { get; set; }
}
c# reactiveui
2个回答
0
投票

如果我尝试这样做,我会使用同步,如下所示:

var locker = new object();
SomeReactive.WhenAnyValue(o => o.SomeBool)
    .Synchronize(locker)
    .Subscribe(async someBool =>
    {
        if(someBool)
        {
            await ...
            Start();
        }
        else
        {
            await ...
            Stop();
        }
    });

希望这适用于您的情况。


0
投票

我回到你的问题,我有一些希望:

SomeReactive.WhenAnyValue(o => o.SomeBool)
    .Select(flag => Observable.Create<Unit>(async _ =>
    {
        if (flag)
        {
            await Task.Delay(100);
            Console.WriteLine("Start - select + concat");
        }
        else
        {
            await Task.Delay(300);
            Console.WriteLine("Stop - select + concat");
        }
    }))
    .Concat()
    .Subscribe();

Select
在管道中创建一个新的
Observable
- 这里,我们有一个逻辑。接下来,我们需要保留顺序/不并行运行,因此使用
Concat
。最后需要
Subscribe

在我的调查过程中,我还用信号量制作了较少的 RX 代码。

SemaphoreSlim semaphoreSlim = new(1);
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async flag =>
{
    await semaphoreSlim.WaitAsync();

    try
    {
        if (flag)
        {
            await Task.Delay(100);
            Console.WriteLine("Start - semaphore");
        }
        else
        {
            await Task.Delay(300);
            Console.WriteLine("Stop - semaphore");
        }
    }
    finally
    {
        semaphoreSlim.Release();
    }
});
© www.soinside.com 2019 - 2024. All rights reserved.