向所有使用者发送TPL数据流重复消息

问题描述 投票:2回答:1

我目前正在使用WPF和TPL Dataflow编写应执行以下操作的应用程序:

  1. 将所有文件加载到目录中
  2. 一旦开始处理,将内容记录到ui并处理每个文件
  3. 完成后,将某些内容记录到ui

问题是,登录到UI的操作需要在UI线程中进行,并且仅在开始处理之前进行记录。

我现在能够做到这一点的唯一方法是从TPL Transform块内部手动调用调度程序并更新UI:

Application.Current.Dispatcher.Invoke(new Action(() =>
{
    ProcessedFiles.Add(optimizedFileResult);
}));

我想通过DataFlow块执行此操作,尽管该块使用以下方法在UI线程上运行:

ExecutionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

但是,如果我在进行优化的块上对此进行设置,则优化也将在单线程上运行。

另一方面,如果我要在Processing块之前创建一个新块并在其中调用它。在实际开始之前,它将开始说“处理”方式。

样本代码

我创建了一些示例代码来重现此问题:

public class TplLoggingToUiIssue
    {
        public TplLoggingToUiIssue()
        {

        }

        public IEnumerable<string> RecurseFiles()
        {
            for (int i = 0; i < 20; i++)
            {
                yield return i.ToString();
            }
        }

        public async Task Go()
        {
            var block1 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"1: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block2 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"2: {input}\t\t\tStarting {input} now (ui logging)");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                //TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), (Doesn't work in Console app, but you get the idea)
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            var block3 = new TransformBlock<string, string>(async input =>
            {
                Console.WriteLine($"3 start: {input}");
                await Task.Delay(5000);
                Console.WriteLine($"3 end: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 2,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block4 = new ActionBlock<string>(input =>
            {
                Console.WriteLine($"4: {input}");
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
            block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
            block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });


            var files = RecurseFiles();
            await Task.Run(async () =>
            {
                foreach (var file in files)
                {
                    Console.WriteLine($"Posting: {file}");
                    var result = await block1.SendAsync(file);

                    if (!result)
                    {
                        Console.WriteLine("Result is false!!!");
                    }
                }
            });

            Console.WriteLine("Completing");
            block1.Complete();
            await block4.Completion;
            Console.WriteLine("Done");
        }
    }

如果运行此示例(只有6个“文件”),您将获得以下输出:

Posting: 0
Posting: 1
Posting: 2
Posting: 3
Posting: 4
Posting: 5
1: 2
1: 1
1: 3
1: 0
1: 4
1: 5
2: 2                    Starting 2 now (ui logging)
Completing
3 start: 2
2: 0                    Starting 0 now (ui logging)
3 start: 0
2: 3                    Starting 3 now (ui logging)
2: 1                    Starting 1 now (ui logging)
2: 4                    Starting 4 now (ui logging)
2: 5                    Starting 5 now (ui logging)
3 end: 2
3 end: 0
3 start: 3
3 start: 1
4: 2
4: 0
3 end: 3
3 end: 1
4: 3
3 start: 4
3 start: 5
4: 1
3 end: 5
3 end: 4
4: 5
4: 4
Done

从此输出中可以看到,它的开始记录太早了。我还改用了Broadcast块,但这会覆盖值,使它们丢失。

理想的情况是使日志记录块以某种方式等待,直到处理块具有容量,然后再推送一项。

c# .net task-parallel-library
1个回答
0
投票
这里是一种人为设计的方法,通过启动完成的事件来增强异步lambda作为ActionBlock的参数的传递。

public static Func<TInput, Task> Enhance<TInput>( Func<TInput, Task> action, Action<TInput> onActionStarted = null, Action<TInput> onActionFinished = null, ISynchronizeInvoke synchronizingObject = null) { return async (item) => { RaiseEvent(onActionStarted, item, synchronizingObject); await action(item).ConfigureAwait(false); RaiseEvent(onActionFinished, item, synchronizingObject); }; } private static void RaiseEvent<T>(Action<T> onEvent, T arg1, ISynchronizeInvoke synchronizingObject) { if (onEvent == null) return; if (synchronizingObject != null && synchronizingObject.InvokeRequired) { synchronizingObject.Invoke(onEvent, new object[] { arg1 }); } else { onEvent(arg1); } }

用法示例:

private void Form_Load(object sender, EventArgs e) { var block = new ActionBlock<string>(Enhance<string>(async item => { await Task.Delay(5000); // Simulate some lengthy asynchronous job }, onActionStarted: item => { this.Text = $"{item} started"; }, onActionFinished: item => { ListBoxCompleted.Items.Add(item); }, synchronizingObject: this), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 2, BoundedCapacity = 10, EnsureOrdered = false }); }

onActionStartedonActionFinished回调将在UI线程中为每个处理的项目调用一次。
© www.soinside.com 2019 - 2024. All rights reserved.