我正在寻求并行处理依赖于对象的任务(
State
),它不是线程安全的,并且其构造非常耗时。
出于这个原因,我正在研究分区局部变量,但要么我做错了,要么在寻找其他东西。这或多或少代表了我当前的实现:
Parallel.ForEach<string, State>(folders, config, () => new State(), (source, loopState, index, threadState) =>
{
var content = File.ReadAllText(source); // read file
var result = threadState.doSomething(content); // do something
File.WriteAllText(outputFile, result); // write output
return threadState;
}, (threadState) => { });
但是,我在
Console.WriteLine
初始化程序中添加了 State
,并且我发现对于循环的每次迭代,都会调用 State
构造函数,从而导致性能大幅下降。我希望将一个线程中 State
的 instance传递给同一线程上的后续迭代。
我怎样才能实现这一目标?
您有几个选择。最简单的是创建单个
State
对象,并将其与 lock
语句同步:
State state = new();
Parallel.ForEach(folders, parallelOptions, folder =>
{
string content = File.ReadAllText(folder);
string result;
lock (state) { result = state.DoSomething(content); }
File.WriteAllText(outputFile, result);
});
我认为这是不可行的,因为
DoSomething
方法非常耗时,并且同步它会违背并行化的目的。
ThreadLocal<State>
。此类提供数据的线程本地存储,因此创建的 State
对象的数量将等于 Parallel.ForEach
使用的线程数。
using ThreadLocal<State> threadLocalState = new(() => new State());
Parallel.ForEach(folders, parallelOptions, folder =>
{
string content = File.ReadAllText(folder);
string result = threadLocalState.Value.DoSomething(content);
File.WriteAllText(outputFile, result);
});
这可能会创建比
State
重载更少的 Parallel.ForEach<TSource, TLocal>
对象,但仍然不等于配置的 MaxDegreeOfParallelism
。 Parallel.ForEach
使用来自 ThreadPool
的线程,并且很可能在计算过程中使用所有线程,前提是 folders
的列表足够长。而且你几乎无法控制 ThreadPool
的大小。所以这也不是一个特别诱人的解决方案。
我能想到的第三个也是最后一个选项是创建一个
State
对象池,并且每个循环中都有一个 Rent
/Return
:
ObjectPool<State> statePool = new(() => new State());
Parallel.ForEach(folders, parallelOptions, folder =>
{
State state = statePool.Rent();
string content = File.ReadAllText(folder);
string result = state.DoSomething(content);
File.WriteAllText(outputFile, result);
statePool.Return(state);
});
这样实例化的
State
对象的数量将等于最大并行度。
唯一的问题是标准 .NET 库中没有
ObjectPool<T>
类(只有 ArrayPool<T>
类),因此您必须找到一个。这是一个基于 ConcurrentBag<T>
的简单实现:
public class ObjectPool<T> : IEnumerable<T> where T : new()
{
private readonly ConcurrentBag<T> _bag = new ConcurrentBag<T>();
private readonly Func<T> _factory;
public ObjectPool(Func<T> factory = null) => _factory = factory;
public T Rent()
{
if (_bag.TryTake(out var obj)) return obj;
return _factory is not null ? _factory() : new T();
}
public void Return(T obj) => _bag.Add(obj);
public IEnumerator<T> GetEnumerator() => _bag.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
ObjectPool<T>
类存在于 Microsoft.Extensions.ObjectPool 包中。它的 API 与上面显示的自定义 ObjectPool<T>
实现略有不同。