TPL:如何拆分和合并数据流?

问题描述 投票:0回答:1

我正在尝试使用具有以下格式的tpl创建数据流:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

[想法是,GetInputPathsBlock是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个LoadDataBlock。 LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串)。然后将加载的数据发送到ProcessDataBlock,它进行一些简单的处理。然后,来自每个ProcessDataBlock的数据将发送到MergeDataBlock,然后将其合并并发送到SaveDataBlock,然后将其保存到文件中。

将其视为需要每月运行的数据流。首先,找到每天数据的路径。每天的数据都被加载和处理,然后在整个月中合并在一起并保存。每个月可以并行运行,一个月中每一天的数据可以并行加载和并行处理(在加载单日数据之后),并且当该月的所有内容加载并处理后,就可以合并并保存。

我尝试了什么

据我所知,TransformManyBlock<TInput,string>可用于拆分(GetInputPathsBlock,并可链接到普通TransformBlock<string,InputData>LoadDataBlock),并从那里链接到另一个TransformBlock<InputData,ProcessedData>([ C0]),但我不知道如何将其合并回单个块。

我看过的内容

我发现了ProcessDataBlock,它使用this answerTransformManyBlock转换为IEnumerable<item>,但我并不完全理解,也无法将itemTransformBlock<InputData,ProcessedData>)链接到一种ProcessDataBlock,所以我不知道如何使用。

我也看到了答案TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>,建议使用like this,但是输入文件的数量N有所不同,并且无论如何都以相同的方式加载文件。

还有JoinBlock,它似乎可以满足我的要求,但是我不完全理解,而且我不知道如何将字典的设置转移到我的案子上。

如何拆分和合并数据流?

  • 我是否缺少块类型
  • 我可以以某种方式两次使用this answer吗?
  • tpl对分割/合并有意义吗,还是有一种更简单的异步/等待方式?
c# asynchronous async-await task-parallel-library
1个回答
0
投票

我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们。这是两个嵌套的TransformManyBlock的示例,它们处理2020年的所有日子:

TransformBlock

为了收集内部区块的日常结果,我使用了如下所示的扩展方法var monthlyBlock = new TransformBlock<int, List<string>>(async (month) => { var dailyBlock = new TransformBlock<int, string>(async (day) => { await Task.Delay(100); // Simulate async work return day.ToString(); }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 }); foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month))) await dailyBlock.SendAsync(day); dailyBlock.Complete(); var dailyResults = await dailyBlock.ToListAsync(); return dailyResults; }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); foreach (var month in Enumerable.Range(1, 12)) await monthlyBlock.SendAsync(month); monthlyBlock.Complete();

ToListAsync
© www.soinside.com 2019 - 2024. All rights reserved.