我有一个处理永无止境的数据流的类。我还有外部组件,希望不时对处理后的数据进行异步采样。由于处理“下一个项目”可能需要一段时间,因此我希望数据检查器组件“等待”该项目,这样它们就不会阻塞。它们可以是任意数量的不同数据检查组件,如果多个检查员正在等待一个项目,那么如果它们都获得相同的项目实例就可以了。
我有下面使用
TaskCompletionSource
的实现,它似乎工作正常,但数据处理器使用完整的线程来处理数据,而不是任务。如果我要切换到任务,那么 lock
就会导致问题。如果可能的话,我还想避免锁定,因为绝大多数时候没有任何东西试图检查这些项目。
public class DataProcessor
{
private object _getDataLock = new object();
private TaskCompletionSource<Data> _getData;
private Thread _processThread;
public DataProcessor()
{
_processThread = new(ProcessDataLoop);
_processThread.Start();
}
void ProcessDataLoop()
{
while (true)
{
var processedData = ProcessNextData();
lock (_getDataLock)
{
_getData?.SetResult(processedData.Clone());
_getData = null;
}
}
}
public Task<Data> GetData(TimeSpan timeout)
{
lock (_getDataLock)
{
_getData ??= new TaskCompletionSource<Data>();
return _getData.Task.WithTimeout(timeout);
}
}
}
public class DataInspector
{
private DataProcessor _processor;
public async Task InspectData()
{
var data = await _processor.GetData(TimeSpan.FromSeconds(30));
// look at data
}
}
我想知道是否有一种方法可以在不锁定的情况下做同样的事情。