使用 TaskCompletionSource<T>,以便调用者可以异步等待下一个项目

问题描述 投票:0回答:1

我有一个处理永无止境的数据流的类。我还有外部组件,希望不时对处理后的数据进行异步采样。由于处理“下一个项目”可能需要一段时间,因此我希望数据检查器组件“等待”该项目,这样它们就不会阻塞。它们可以是任意数量的不同数据检查组件,如果多个检查员正在等待一个项目,那么如果它们都获得相同的项目实例就可以了。

我有下面使用

TaskCompletionSource
的实现,它似乎工作正常,但数据处理器使用完整的线程来处理数据,而不是任务。如果我要切换到任务,那么
lock
就会导致问题。如果可能的话,我还想避免锁定,因为绝大多数时候没有任何东西试图检查这些项目。

public class DataProcessor
{
  private object _getDataLock = new object();
  private TaskCompletionSource<Data> _getData;
  private Thread _processThread;

  public DataProcessor()
  {
     _processThread = new(ProcessDataLoop);
     _processThread.Start();
  }

  void ProcessDataLoop()
  {
      while (true)
      {
         var processedData = ProcessNextData();
         lock (_getDataLock)
         {
            _getData?.SetResult(processedData.Clone());
            _getData = null;
         }
      }
  }

  public Task<Data> GetData(TimeSpan timeout)
  {
    lock (_getDataLock)
    {
      _getData ??= new TaskCompletionSource<Data>();
      return _getData.Task.WithTimeout(timeout);
    }
  }
}

public class DataInspector
{
    private DataProcessor _processor;

    public async Task InspectData()
    {
       var data = await _processor.GetData(TimeSpan.FromSeconds(30));
       // look at data
    }
}

我想知道是否有一种方法可以在不锁定的情况下做同样的事情。

c# async-await task taskcompletionsource
1个回答
0
投票

您可以使用系统可等待句柄来等待(例如,手动重置事件),而不是您的方法,并且您可以将对象存储在并发队列中。

然后,您的消费者遵循一种模式,尝试从并发队列中取出尽可能多的对象,然后异步等待事件脉冲。生产者每次将新项目推送到集合时都会触发该事件。

如果您想避免重新发明轮子,这里有一个异步集合的可靠实现,具有最大数量的项目和通过异步函数进行生产者/消费者排队:

https://github.com/StephenCleary/AsyncEx/blob/master/ src/Nito.AsyncEx.Coordination/AsyncCollection.cs

© www.soinside.com 2019 - 2024. All rights reserved.