在 Parallel.ForEach 中将变量保留在线程上

问题描述 投票:0回答:1

我正在寻求并行处理依赖于对象的任务(

State
),它不是线程安全的,并且其构造非常耗时

出于这个原因,我正在研究分区局部变量,但要么我做错了,要么在寻找其他东西。这或多或少代表了我当前的实现:

Parallel.ForEach<string, State>(folders, config, () => new State(), (source, loopState, index, threadState) =>
{
    var content = File.ReadAllText(source);        // read file
    var result = threadState.doSomething(content); // do something
    File.WriteAllText(outputFile, result);         // write output
    return threadState;
}, (threadState) => { });

但是,我在

Console.WriteLine
初始化程序中添加了
State
,并且我发现对于循环的每次迭代,都会调用
State
构造函数,从而导致性能大幅下降。我希望将一个线程中 State
instance
传递给同一线程上的后续迭代。

我怎样才能实现这一目标?

c# multithreading parallel-processing task-parallel-library parallel.foreach
1个回答
2
投票

您有几个选择。最简单的是创建单个

State
对象,并将其与
lock
语句同步:

State state = new();

Parallel.ForEach(folders, parallelOptions, folder =>
{
    string content = File.ReadAllText(folder);
    string result;
    lock (state) { result = state.DoSomething(content); }
    File.WriteAllText(outputFile, result);
});

我认为这是不可行的,因为

DoSomething
方法非常耗时,并且同步它会违背并行化的目的。

另一种选择是使用

ThreadLocal<State>
。此类提供数据的线程本地存储,因此创建的
State
对象的数量将等于
Parallel.ForEach
使用的线程数。

using ThreadLocal<State> threadLocalState = new(() => new State());

Parallel.ForEach(folders, parallelOptions, folder =>
{
    string content = File.ReadAllText(folder);
    string result = threadLocalState.Value.DoSomething(content);
    File.WriteAllText(outputFile, result);
});

这可能会创建比

State
重载更少的
Parallel.ForEach<TSource, TLocal>
对象,但仍然不等于配置的
MaxDegreeOfParallelism
Parallel.ForEach
使用来自
ThreadPool
的线程,并且很可能在计算过程中使用所有线程,前提是
folders
的列表足够长。而且你几乎无法控制
ThreadPool
的大小。所以这也不是一个特别诱人的解决方案。

我能想到的第三个也是最后一个选项是创建一个

State
对象池,并且每个循环中都有一个
Rent
/
Return

ObjectPool<State> statePool = new(() => new State());

Parallel.ForEach(folders, parallelOptions, folder =>
{
    State state = statePool.Rent();
    string content = File.ReadAllText(folder);
    string result = state.DoSomething(content);
    File.WriteAllText(outputFile, result);
    statePool.Return(state);
});

这样实例化的

State
对象的数量将等于最大并行度。

唯一的问题是标准 .NET 库中没有

ObjectPool<T>
类(只有
ArrayPool<T>
类),因此您必须找到一个。这是一个基于
ConcurrentBag<T>
的简单实现:

public class ObjectPool<T> : IEnumerable<T> where T : new()
{
    private readonly ConcurrentBag<T> _bag = new ConcurrentBag<T>();
    private readonly Func<T> _factory;

    public ObjectPool(Func<T> factory = null) => _factory = factory;

    public T Rent()
    {
        if (_bag.TryTake(out var obj)) return obj;
        return _factory is not null ? _factory() : new T();
    }

    public void Return(T obj) => _bag.Add(obj);

    public IEnumerator<T> GetEnumerator() => _bag.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

更新:

ObjectPool<T>
类存在于 Microsoft.Extensions.ObjectPool 包中。它的 API 与上面显示的自定义
ObjectPool<T>
实现略有不同。

© www.soinside.com 2019 - 2024. All rights reserved.