停止两个异步方法与另一个同时运行

问题描述 投票:3回答:5

我有两个异步函数,我将其称为ChangeState()DoThing()。他们每个人都在等待下游的异步方法。这些是从事件处理程序调用的,因此它们在执行时不会阻止任何其他代码。如果ChangeState()被召唤,那么DoThing()在任何先前的ChangeState()完成之前不会做它的事情是必要的。当ChangeState()仍在执行时,可以再次调用它。在DoThing()可以继续之前,任何执行都应该在DoThing()之前完成。

反之亦然; ChangeState()应该等到任何以前运行的DoStuff()完成。

如何在没有死锁危险的情况下实现这一点?

我知道在锁定语句中不允许等待,这是有充分理由的,这就是为什么我不想重新创建该功能。

async void ChangeState(bool state)
{
 //Wait here until any pending DoStuff() is complete.
 await OutsideApi.ChangeState(state);
}

async void DoStuff()
{
 //Wait here until any pending ChangeState() is complete.
 await OutsideApi.DoStuff();
}
c# .net multithreading concurrency async-await
5个回答
2
投票

根据您的要求,似乎像ReaderWriterLock可以帮助您。此外,既然你有async方法,你应该使用async锁。不幸的是,没有等待.NET框架本身提供的ReaderWriterLock锁。幸运的是,你可以看看AsyncEx库或这个article。使用AsyncEx的示例。

var readerWriterLock = new AsyncReaderWriterLock();

async void ChangeState(bool state)
{
    using(await readerWriterLock.ReaderLockAsync())
    {
        await OutsideApi.ChangeState(state);
    }
}

async void DoStuff()
{
    using(await readerWriterLock.WriterLockAsync())
    {
        await OutsideApi.DoStuff();
    }
}

注:这个解决方案仍然有限制,DoStuff调用不能并发,写入锁定,但仍然是调用的顺序和在DoStuff之前完成所有ChangeState的要求,反之亦然。(来自@Scott张伯伦的提示使用两个读者和作家锁)


0
投票

您可以使用ManualResetEvent或AutoResetEvent来表示线程已完成,以便另一个线程可以继续工作。

一些样品可以找到herehere


0
投票

编辑:第一个解决方案不符合要求。

  1. 创建自定义锁类。 此类跟踪从哪种类型(ChangeState和DoThing)运行的实例数,并提供检查任务是否可以运行的方法。
        public class CustomLock
        {
            private readonly int[] Running;
            private readonly object _lock;
            public CustomLock(int Count)
            {
                Running = new int[Count];
                _lock = new object();
            }

            public void LockOne(int Task)
            {
                lock (_lock)
                {
                    Running[Task]++;
                }
            }

            public void UnlockOne(int Task)
            {
                lock (_lock)
                {
                    Running[Task]--;
                }
            }

            public bool Locked(int Task)
            {
                lock (_lock)
                {
                    for (int i = 0; i < Running.Length; i++)
                    {
                        if (i != Task && Running[i] != 0)
                            return true;
                    }
                    return false;
                }
            }
        }
  1. 更改现有代码。 ChangeState将是任务0,DoStuff将是任务1。
private CustomLock Lock = new CustomLock(2); //Create a new instance of the class for 2 tasks

async Task ChangeState(bool state)
{
   while (Lock.Locked(0)) //Wait for the task to get unlocked
      await Task.Delay(10);
   Lock.LockOne(0); //Lock this task
   await OutsideApi.ChangeState(state);
   Lock.UnlockOne(0); //Task finished, unlock one
}

async Task DoStuff()
{
   while (Lock.Locked(1))
      await Task.Delay(10);
   Lock.LockOne(1);
   await OutsideApi.DoStuff();
   Lock.UnlockOne(1);
}

当任何ChangeState运行时,可以在不等待的情况下启动新的,但是当调用DoStuff时,它将等待直到所有ChangeStates完成,这也是另一种方式。


0
投票

我为实践了一个名为KeyedLock的同步原语,它允许一次只有一个键的并发异步操作。所有其他键都排队,稍后批量解锁(按键)。该类旨在使用如下:

KeyedLock _keyedLock;

async Task ChangeState(bool state)
{
    using (await this._keyedLock.LockAsync("ChangeState"))
    {
        await OutsideApi.ChangeState(state);
    }
}

async Task DoStuff()
{
    using (await this._keyedLock.LockAsync("DoStuff"))
    {
        await OutsideApi.DoStuff();
    }
}

例如,下面的调用:

await ChangeState(true);
await DoStuff();
await DoStuff();
await ChangeState(false);
await DoStuff();
await ChangeState(true);

......将按此顺序执行:

ChangeState(true);
ChangeState(false); // concurrently with the above
ChangeState(true); // concurrently with the above
DoStuff(); // after completion of the above
DoStuff(); // concurrently with the above
DoStuff(); // concurrently with the above

KeyedLock类:

class KeyedLock
{
    private object _currentKey;
    private int _currentCount = 0;
    private WaitingQueue _waitingQueue = new WaitingQueue();
    private readonly object _locker = new object();

    public Task WaitAsync(object key, CancellationToken cancellationToken)
    {
        if (key == null) throw new ArgumentNullException(nameof(key));
        lock (_locker)
        {
            if (_currentKey != null && key != _currentKey)
            {
                var waiter = new TaskCompletionSource<bool>();
                _waitingQueue.Enqueue(new KeyValuePair<object,
                    TaskCompletionSource<bool>>(key, waiter));
                if (cancellationToken != null)
                {
                    cancellationToken.Register(() => waiter.TrySetCanceled());
                }
                return waiter.Task;
            }
            else
            {
                _currentKey = key;
                _currentCount++;
                return cancellationToken.IsCancellationRequested ?
                    Task.FromCanceled(cancellationToken) : Task.FromResult(true);
            }
        }
    }
    public Task WaitAsync(object key) => WaitAsync(key, CancellationToken.None);

    public void Release()
    {
        List<TaskCompletionSource<bool>> tasksToRelease;
        lock (_locker)
        {
            if (_currentCount <= 0) throw new InvalidOperationException();
            _currentCount--;
            if (_currentCount > 0) return;
            _currentKey = null;
            if (_waitingQueue.Count == 0) return;
            var newWaitingQueue = new WaitingQueue();
            tasksToRelease = new List<TaskCompletionSource<bool>>();
            foreach (var entry in _waitingQueue)
            {
                if (_currentKey == null || entry.Key == _currentKey)
                {
                    _currentKey = entry.Key;
                    _currentCount++;
                    tasksToRelease.Add(entry.Value);
                }
                else
                {
                    newWaitingQueue.Enqueue(entry);
                }
            }
            _waitingQueue = newWaitingQueue;
        }
        foreach (var item in tasksToRelease)
        {
            item.TrySetResult(true);
        }
    }
    private class WaitingQueue :
        Queue<KeyValuePair<object, TaskCompletionSource<bool>>>
    { }

    public Task<Releaser> LockAsync(object key,
        CancellationToken cancellationToken)
    {
        var waitTask = this.WaitAsync(key, cancellationToken);
        return waitTask.ContinueWith(
            (_, state) => new Releaser((KeyedLock)state),
            this, cancellationToken,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default
        );
    }
    public Task<Releaser> LockAsync(object key)
        => LockAsync(key, CancellationToken.None);

    public struct Releaser : IDisposable
    {
        private readonly KeyedLock _parent;
        internal Releaser(KeyedLock parent) { _parent = parent; }
        public void Dispose() { _parent?.Release(); }
    }

}

-3
投票

这似乎非常适合一对ReaderWriterLockSlims。

private readonly ReaderWriterLockSlim changeStateLock = new ReaderWriterLockSlim();
private readonly ReaderWriterLockSlim doStuffLock = new ReaderWriterLockSlim();

一个控制访问ChangeState,另一个控制访问DoStuff

读取器锁定用于表示正在执行方法,并且写入器锁定用于表示正在执行其他方法。 ReaderWriterLockSlim允许多次读取,但写入是独占的。

Task.Yield只是为了让控制权重新回到来电者,因为ReaderWriterLockSlim的阻挡。

async Task ChangeState(bool state)
{
    await Task.Yield();

    doStuffLock.EnterWriteLock();

    try
    {
        changeStateLock.EnterReadLock();

        try
        {
            await OutsideApi.ChangeState(state);
        }
        finally
        {
            changeStateLock.ExitReadLock();
        }
    }
    finally
    {
        doStuffLock.ExitWriteLock();
    }
}

async Task DoStuff()
{
    await Task.Yield();

    changeStateLock.EnterWriteLock();

    try
    {
        doStuffLock.EnterReadLock();

        try
        {
            await OutsideApi.DoStuff();
        }
        finally
        {
            doStuffLock.ExitReadLock();
        }
    }
    finally
    {
        changeStateLock.ExitWriteLock();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.