我有一个TransformBlock<int, int>
有MaxDegreeOfParallelism = 6
。我还发现传递给块的构造函数的Func<int, int>
(将为每个已发布的项执行)可以在逻辑上分解为一个昂贵的初始化例程和一个改变函数局部变量的主体。如果我可以将函数重构为一个名为TransformBlockState
的类,每个并发操作执行一次初始化(就像Parallel.For
的localInit
回调一样)然后允许TPL Dataflow确保状态永远不会被多个项目突变,那么效率会更高。一时间
在重构之前:
Func<int, int> original = x => {
// method local variables
// expensive initialization routine to setup locals
// perform action on local variables
// potentially expensive teardown
}
重构后:
public sealed class TransformBlockState<TIn, TOut> : IDisposable
{
// instance state
public TransformBlockState()
{
// expensive initialization routine
}
public TOut Transform(TIn value)
{
// called many times but never concurrently for the same instance
}
public void Dispose()
{
// tear down state
}
}
类似于localInit
(对于.ctor
)和localFinally
(对于Dispose
)回调已经存在于TPL数据流库中吗?
我想避免使用ConcurrentStack<TransformBlockState>
(许多不必要的锁定),我想避免将TransformBlockState
存储在[ThreadStatic]
字段中(因为无法保证Task
不会在多个线程上运行(顺序显然)或多个Task
s单线程(可能是I / O上的所有阻塞))。
如果你想拥有一个有状态块TransformBlock
(或ActionBlock
),你可以创建一个创建块的函数,并将状态放在局部变量中并捕获它们:
private IPropagatorBlock<int,int> CreateMyBlock()
{
var state = 0;
return new TransformBlock<int,int>( x => x+state++ );
}
这样,您的类由编译器隐式创建。
没有相当于loclaInit
或localFinally
。您可以使用块管道创建类似的行为,或者如果这是昂贵的初始化,则可能使用连接池。但您可能需要重新考虑您的问题,而TPL-Dataflow可能不是最合适的。不知道更多关于确切问题的解决方法很难说。但通常任何一次初始化/每次输入都应该在流程之外完成并传入。
但就像我说你可以使用管道获得类似Parallel.Foreach
的东西,虽然它可能不是你想要的。
public class DataflowPipeline
{
private TransformBlock<IEnumerable<int>, IEnumerable<Locals>> Initialize { get; }
private TransformManyBlock<IEnumerable<Locals>, Locals> Distribute { get; }
private TransformBlock<Locals, Result> Compute { get; }
//other blocks, results, disposal etc.
public DataflowPipeline()
{
var sequential = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 };
var parallel = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 6 };
Initialize = new TransformBlock<IEnumerable<int>, IEnumerable<Locals>>(
inputs => inputs.Select(x => new Locals() { ExpensiveItem = string.Empty, Input = x }),
sequential);
Distribute = new TransformManyBlock<IEnumerable<Locals>, Locals>(x => x, sequential);
Compute = new TransformBlock<Locals, Result>(
local => new Result() { ExpensiveItem = local.ExpensiveItem, Output = local.Input * 2 },
parallel);
//Other blocks, link, complete etc.
}
}
我想我有一个更好的例子 - 我需要从航空公司获得几千张票据记录(实际上是GDS)。为此,我需要在发送SOAP或REST请求之前建立一个昂贵的会话。会话被限制,所以我真的不想为每张票创建一个新的。它使每个请求所需的时间翻倍,浪费金钱和资源。
创建自定义块似乎是解决方案,但实际上并不是那么好。数据流建立处理消息流的处理块的流水线。试图让它们以不同的方式工作将与数据流模型的基本假设发生冲突。
例如,任务用于并行,限制和负载平衡 - 在接收到最大数量的消息后,MaxMessagesPerTask选项会终止任务,这样一项任务就不会长时间占用CPU。创建和销毁每个任务的会话将破坏该机制并最终创建超过必要的会话。
池
处理此问题的一种方法是使用一个对象池,该对象池将由块使用的“昂贵”对象提供,在本例中为Sessions。令人生气的是,Microsoft.Extensions.ObjectPool套餐提供了这样一个游泳池。文档are non-existent,它们被欺骗性地放在ASP.NET
树中,但这是一个独立的.NET Standard 2.0软件包。 Github source看似简单,类使用Interlocked.CompareExchange来避免锁定。甚至还有一个LeakTrackingObjectPool实现。
如果我过去知道这件事,我可以写:
var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());
DefaultPooledObjectPolicy政策只是使用new
来创建一个新实例。虽然创建新策略很容易,例如使用自己的创建逻辑或工厂方法的策略:
public class SessionPolicy : DefaultPooledObjectPolicy<Session>
{
public override Session Create()
{
//Do whatever is needed here
return session;
}
}
重定向
另一种选择是使用多个块实例并使源块链接到所有这些实例。为避免将所有消息发送到第一个块,需要有限容量。假设我们有这种工厂方法:
TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)
{
var session=CreateSomeSessionFrom(someSettings);
var bounded=new DataflowBlockOptions {BoundedCapacity =1};
return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);
}
并使用它来创建多个块:
_blocks=Enumerable.Range(0,10)
.Select(_=>CreateThatBlockWithSession(settings))
.ToArray();
源块可以连接到所有这些块:
foreach(var target in _blocks)
{
_source.LinkTo(target,options);
}
然后,将所有这些块链接到下一个块。这里棘手的部分是我们不能只传播完成。如果其中一个块完成,即使在其他块中有消息等待,它也会强制完成下一个块。
解决方案是使用qazxsw poi和qazxsw poi将完成传播到下一个块:
Task.WhenAll
一个更强大的实现将检查所有任务的ContinueWith
状态,如果其中一个失败,则在下一个块上调用foreach(var target in _blocks)
{
target.LinkTo(_nextBlock);
}
var allTasks=_blocks.Select(blk=>blk.Completion);
Task.WhenAll(allTasks)
.ContinueWith(_=>_nextBlock.Complete());