所以我正在运行一个
Parallel.ForEach
,它基本上会生成一堆数据,这些数据最终将保存到数据库中。然而,由于数据收集可能会变得相当大,我需要能够偶尔保存/清除收集,以免遇到 OutOfMemoryException
。
我刚开始使用
Parallel.ForEach
、并发集合和锁,所以我对到底需要做什么来确保一切正常工作有点模糊(即我们没有将任何记录添加到集合中)保存和清除操作之间)。
目前我说的是,如果记录计数高于某个阈值,则将数据保存在当前集合中的
lock
块内。
ConcurrentStack<OutRecord> OutRecs = new ConcurrentStack<OutRecord>();
object StackLock = new object();
Parallel.ForEach(inputrecords, input =>
{
lock(StackLock)
{
if (OutRecs.Count >= 50000)
{
Save(OutRecs);
OutRecs.Clear();
}
}
OutRecs.Push(CreateOutputRecord(input);
});
if (OutRecs.Count > 0) Save(OutRecs);
我不能 100% 确定这是否像我想象的那样有效。锁是否会阻止循环的其他实例写入输出集合?如果没有的话有更好的方法吗?
您的锁将正常工作,但效率不会很高,因为所有您的工作线程将在每个保存操作的整个持续时间内被迫暂停。此外,锁往往(相对)昂贵,因此在每个线程的每次迭代中执行锁有点浪费。
您的评论之一提到为每个工作线程提供自己的数据存储:是的,您可以这样做。以下是您可以根据自己的需求进行定制的示例:
Parallel.ForEach(
// collection of objects to iterate over
inputrecords,
// delegate to initialize thread-local data
() => new List<OutRecord>(),
// body of loop
(inputrecord, loopstate, localstorage) =>
{
localstorage.Add(CreateOutputRecord(inputrecord));
if (localstorage.Count > 1000)
{
// Save() must be thread-safe, or you'll need to wrap it in a lock
Save(localstorage);
localstorage.Clear();
}
return localstorage;
},
// finally block gets executed after each thread exits
localstorage =>
{
if (localstorage.Count > 0)
{
// Save() must be thread-safe, or you'll need to wrap it in a lock
Save(localstorage);
localstorage.Clear();
}
});
一种方法是定义一个代表数据目的地的抽象。可能是这样的:
public interface IRecordWriter<T> // perhaps come up with a better name.
{
void WriteRecord(T record);
void Flush();
}
并行处理记录的类不需要担心这些记录是如何处理的,或者当记录太多时会发生什么。
IRecordWriter
的实现处理所有这些细节,使您的其他类更容易测试。
IRecordWriter
的实现可能看起来像这样:
public abstract class BufferedRecordWriter<T> : IRecordWriter<T>
{
private readonly ConcurrentQueue<T> _buffer = new ConcurrentQueue<T>();
private readonly int _maxCapacity;
private bool _flushing;
public ConcurrentQueueRecordOutput(int maxCapacity = 100)
{
_maxCapacity = maxCapacity;
}
public void WriteRecord(T record)
{
_buffer.Enqueue(record);
if (_buffer.Count >= _maxCapacity && !_flushing)
Flush();
}
public void Flush()
{
_flushing = true;
try
{
var recordsToWrite = new List<T>();
while (_buffer.TryDequeue(out T dequeued))
{
recordsToWrite.Add(dequeued);
}
if(recordsToWrite.Any())
WriteRecords(recordsToWrite);
}
finally
{
_flushing = false;
}
}
protected abstract void WriteRecords(IEnumerable<T> records);
}
当缓冲区达到最大大小时,其中的所有记录都将发送到
WriteRecords
。因为 _buffer
是 ConcurrentQueue
,所以即使添加记录,它也可以继续读取记录。
该
Flush
方法可以是任何特定于您如何编写记录的方法。这不是一个抽象类,数据库或文件的实际输出可能是注入到这个类中的另一个依赖项。您可以做出这样的决定、重构并改变主意,因为最初的类不会受到这些更改的影响。它所知道的只是不会改变的 IRecordWriter
界面。
您可能会注意到,我还没有绝对确定
Flush
不会在不同线程上同时执行。我可以对此进行更多锁定,但这并不重要。这将避免大多数并发执行,但如果并发执行都从 ConcurrentQueue
读取也没关系。
这只是一个粗略的轮廓,但它显示了如果我们将它们分开,所有步骤将如何变得更简单且更容易测试。一类将输入转换为输出。另一个类缓冲输出并写入它们。第二类甚至可以分为两部分 - 一个作为缓冲区,另一个作为“最终”编写器,将它们发送到数据库或文件或其他目的地。
锁是否会阻止循环的其他实例写入输出集合?
不,不是。其他线程仍然可以写入
OutRecs
集合,因为 OutRecs.Push
操作不受相同锁对象 (StackLock
) 的保护。这会使您的程序不正确:某些 OutRecord
对象可能不会保存在数据库中。这是一个经典的“竞争条件”,具有非确定性行为。
如果没有,有更好的方法吗?是的,您可以使用非线程安全集合,例如
List<T>
,并在
Parallel.ForEach
循环内同步与集合的每个交互。并行循环完成后,不需要同步(解释here)。示例:
List<OutRecord> results = new();
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(inputRecords, options, input =>
{
OutRecord result = CreateOutputRecord(input);
lock(results)
{
results.Add(result);
if (results.Count >= 50000)
{
Save(results);
results.Clear();
}
}
});
if (results.Count > 0) Save(results);
这样,所有并行活动都将停止,同时保存 50,000 个
OutRecord
,因为所有其他工作线程将在
lock
语句上被阻塞。如果您希望在 Save
期间继续工作,您将需要实现生产者-消费者模式。您可以在此处找到示例。