Dataflow TPL 实现带前提条件的流水线

问题描述 投票:1回答:1

我有一个关于使用Dataflow TPL库实现流水线的问题。

我的情况是,我有一个软件,需要同时处理一些任务。处理过程是这样的:首先我们在全局级别处理相册,然后我们进入相册内部,单独处理每张图片。假设应用程序有处理槽,并且它们是可配置的(为了举例,假设槽=2)。这意味着应用程序可以处理以下两种情况

a)两个相册同时处理 b)一个相册+一个不同相册的照片 c)同一个相册同时处理两张照片 d)不同相册同时处理两张照片

目前我实现了这样的流程。

var albumTransferBlock = new TransformBlock<Album, Album>(ProcessAlbum,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

ActionBlock<Album> photoActionBlock = new ActionBlock<Album>(ProcessPhoto);

albumTransferBlock.LinkTo(photoActionBlock);


Album ProcessAlbum(Album a)
{
    return a;
}

void ProcessPhoto(Album album)
{
    foreach (var photo in album)
    {
        // do some processing
    }
}

我的问题是,当我一次处理一个相册时,应用程序永远不会使用两个插槽来处理照片。除了c)之外,它符合所有要求

谁能帮我解决这个问题,使用DataFlow TPL?

c# task-parallel-library pipeline dataflow tpl-dataflow
1个回答
0
投票

我想我可以自己回答。我的做法是

1)我创建了一个接口IProcessor和方法Process()2)用接口IProcessor包装AlbumProcessing和PhotoProcessing3)创建了一个ActionBlock,它把IProcessor作为Input并执行Process方法。

4) 在处理相册的最后,我将所有照片的处理加入到ActionBlock中。

这100%满足了我的要求。也许有人有其他的解决方案?


0
投票

你可以使用 TransformManyBlock 用于处理相册,连接到一个 ActionBlock 用于处理照片,这样每个相册在处理它的照片之前就被处理了。对于施加超出单个块边界的并发限制,您可以使用有限并发的 TaskSchedulerSemaphoreSlim. 第二种方案更加灵活,因为它也允许对异步操作进行节流。在你的情况下,所有的操作都是同步的,所以你可以自由选择其中一种方法。在这两种情况下,你仍然应该配置 MaxDegreeOfParallelism 选项的块的最大并发量限制,否则--如果您将它们设置为 无限- 处理顺序会变得过于随机。

下面是一个例子 TaskScheduler 办法。它使用的是 ConcurrentScheduler 的财产 ConcurrentExclusiveSchedulerPair 类的例子。

var options = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    TaskScheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,
        maxConcurrencyLevel: 2).ConcurrentScheduler
};

var albumsBlock = new TransformManyBlock<Album, Photo>(album =>
{
    ProcessAlbum(album);
    return album.Photos;
}, options);

var photosBlock = new ActionBlock<Photo>(photo =>
{
    ProcessPhoto(photo);
}, options);

albumsBlock.LinkTo(photosBlock);

这里是一个例子 SemaphoreSlim 办法。使用 WaitAsync 方法,而不是 Wait 的优点是,等待获取信号体的过程会发生。异步所以,没有 ThreadPool 线程将被无谓的封锁。

var throttler = new SemaphoreSlim(2);

var albumsBlock = new TransformManyBlock<Album, Photo>(async album =>
{
    await throttler.WaitAsync();
    try
    {
        ProcessAlbum(album);
        return album.Photos;
    }
    finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

var photosBlock = new ActionBlock<Photo>(async photo =>
{
    await throttler.WaitAsync();
    try
    {
        ProcessPhoto(photo);
    }
    finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

albumsBlock.LinkTo(photosBlock);
© www.soinside.com 2019 - 2024. All rights reserved.