我有一个处理永无止境的数据流的类。我还有外部组件,希望以临时方式异步采样处理后的数据。
由于处理“下一个项目”可能需要一段时间,因此我希望数据检查器组件“等待”该项目,这样它们就不会阻塞。它们可以是任意数量的不同数据检查组件,如果多个检查员正在等待一个项目,那么如果它们都获得相同的项目实例就可以了。
我有下面的实现,使用
TaskCompletionSource
和锁来确保 tcs 在不同的线程上工作。它似乎工作正常,但如果可能的话,我想以无锁的方式做到这一点。
public class DataProcessor
{
private object _sampleDataLock = new object();
private TaskCompletionSource<Data> _sampleData;
private Thread _processThread;
public DataProcessor()
{
_processThread = new(ProcessDataLoop);
_processThread.Start();
}
void ProcessDataLoop()
{
while (true)
{
var processedData = ProcessNextData();
//This code in the lock is what I wonder about
lock (_sampleDataLock)
{
_sampleData?.SetResult(processedData.Clone());
_sampleData = null;
}
}
}
public Task<Data> SampleData(TimeSpan timeout)
{
// This is the code I'm wondering how to write without a lock
lock (_sampleDataLock)
{
_sampleData ??= new TaskCompletionSource<Data>();
return _sampleData.Task.WithTimeout(timeout);
}
}
}
public class DataInspector
{
private DataProcessor _processor;
public async Task InspectData()
{
//wait up to 30s for an item
var data = await _processor.SampleData(TimeSpan.FromSeconds(30));
// look at data
}
}
我想知道是否有一种方法可以在不锁定的情况下做同样的事情。
yield
来解决它。
public Task<Data> GetData(TimeSpan timeout)
{
while (true)
{
var processedData = ProcessNextData();
// no need for temporal coupling, we know know there is data
var taskResult = new TaskCompletionSource<Data>();
taskResult.Task.WithTimeout(timeout);
taskResult.SetResult(processedData.Clone());
yield taskResult;
}
}
或者,您也可以研究 IAsyncEnumerable
来生成可以使用
Task
使用的数据流。