我正在尝试使用具有以下格式的tpl创建数据流:
-> LoadDataBlock1 -> ProcessDataBlock1 ->
GetInputPathsBlock -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
-> LoadDataBlock3 -> ProcessDataBlock3 ->
...
-> LoadDataBlockN -> ProcessDataBlockN ->
[想法是,GetInputPathsBlock
是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个LoadDataBlock
。 LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串)。然后将加载的数据发送到ProcessDataBlock
,它进行一些简单的处理。然后,来自每个ProcessDataBlock
的数据将发送到MergeDataBlock
,然后将其合并并发送到SaveDataBlock
,然后将其保存到文件中。
将其视为需要每月运行的数据流。首先,找到每天数据的路径。每天的数据都被加载和处理,然后在整个月中合并在一起并保存。每个月可以并行运行,一个月中每一天的数据可以并行加载和并行处理(在加载单日数据之后),并且当该月的所有内容加载并处理后,就可以合并并保存。
我尝试了什么
据我所知,TransformManyBlock<TInput,string>
可用于拆分(GetInputPathsBlock
,并可链接到普通TransformBlock<string,InputData>
(LoadDataBlock
),并从那里链接到另一个TransformBlock<InputData,ProcessedData>
([ C0]),但我不知道如何将其合并回单个块。
我看过的内容
我发现了ProcessDataBlock
,它使用this answer从TransformManyBlock
转换为IEnumerable<item>
,但我并不完全理解,也无法将item
(TransformBlock<InputData,ProcessedData>
)链接到一种ProcessDataBlock
,所以我不知道如何使用。
我也看到了答案TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>
,建议使用like this,但是输入文件的数量N有所不同,并且无论如何都以相同的方式加载文件。
还有JoinBlock
,它似乎可以满足我的要求,但是我不完全理解,而且我不知道如何将字典的设置转移到我的案子上。
如何拆分和合并数据流?
我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们。这是两个嵌套的TransformManyBlock
的示例,它们处理2020年的所有日子:
TransformBlock
为了收集内部区块的日常结果,我使用了如下所示的扩展方法var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
var dailyBlock = new TransformBlock<int, string>(async (day) =>
{
await Task.Delay(100); // Simulate async work
return day.ToString();
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });
foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
await dailyBlock.SendAsync(day);
dailyBlock.Complete();
var dailyResults = await dailyBlock.ToListAsync();
return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
foreach (var month in Enumerable.Range(1, 12))
await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();
:
ToListAsync