使用TaskCompletionSource<T>异步等待而不加锁?

问题描述 投票:0回答:2

我有一个处理永无止境的数据流的类。我还有外部组件,希望以临时方式异步采样处理后的数据。

由于处理“下一个项目”可能需要一段时间,因此我希望数据检查器组件“等待”该项目,这样它们就不会阻塞。它们可以是任意数量的不同数据检查组件,如果多个检查员正在等待一个项目,那么如果它们都获得相同的项目实例就可以了。

我有下面的实现,使用

TaskCompletionSource
和锁来确保 tcs 在不同的线程上工作。它似乎工作正常,但如果可能的话,我想以无锁的方式做到这一点。

public class DataProcessor
{
  private object _sampleDataLock = new object();
  private TaskCompletionSource<Data> _sampleData;
  private Thread _processThread;

  public DataProcessor()
  {
     _processThread = new(ProcessDataLoop);
     _processThread.Start();
  }

  void ProcessDataLoop()
  {
      while (true)
      {
         var processedData = ProcessNextData();
         
         //This code in the lock is what I wonder about
         lock (_sampleDataLock)
         {
            _sampleData?.SetResult(processedData.Clone());
            _sampleData = null;
         }
      }
  }

  public Task<Data> SampleData(TimeSpan timeout)
  {
    // This is the code I'm wondering how to write without a lock
    lock (_sampleDataLock)
    {
      _sampleData ??= new TaskCompletionSource<Data>();
      return _sampleData.Task.WithTimeout(timeout);
    }
  }
}

public class DataInspector
{
    private DataProcessor _processor;

    public async Task InspectData()
    {
       //wait up to 30s for an item
       var data = await _processor.SampleData(TimeSpan.FromSeconds(30));
       // look at data
    }
}

我想知道是否有一种方法可以在不锁定的情况下做同样的事情。

c# async-await task taskcompletionsource
2个回答
1
投票

您可以使用系统可等待句柄来等待(例如,手动重置事件),而不是您的方法,并且您可以将对象存储在并发队列中。

然后,您的消费者遵循一种模式,尝试从并发队列中取出尽可能多的对象,然后异步等待事件脉冲。生产者每次将新项目推送到集合时都会触发该事件。

如果您想避免重新发明轮子,这里有一个异步集合的可靠实现,具有最大数量的项目和通过异步函数进行生产者/消费者排队:

https://github.com/StephenCleary/AsyncEx/blob/master/ src/Nito.AsyncEx.Coordination/AsyncCollection.cs


1
投票
您也许可以通过简单的

yield

来解决它。

public Task<Data> GetData(TimeSpan timeout) { while (true) { var processedData = ProcessNextData(); // no need for temporal coupling, we know know there is data var taskResult = new TaskCompletionSource<Data>(); taskResult.Task.WithTimeout(timeout); taskResult.SetResult(processedData.Clone()); yield taskResult; } }
或者,您也可以研究 

IAsyncEnumerable

 来生成可以使用 
Task
 使用的数据流。

© www.soinside.com 2019 - 2024. All rights reserved.