如何在TPL数据流中重置推迟的拒绝消息?

问题描述 投票:3回答:1

我正在使用TDF来处理我的应用程序,目前效果很好,不幸的是,我遇到了一个特殊的问题,似乎不能直接用现有的数据流机制来处理。

我有N个生产者(在这种情况下是BufferBlocks),它们都只连接到1个(都是同一个)ActionBlock。这个区块每次总是处理1个项目,也只有1个项目的容量。

对于从生产者到ActionBlock的链接,我还想添加一个过滤器,但这里的特殊情况是,过滤器的条件可以独立于被处理的物品而改变,而且物品不能被丢弃!所以基本上我想处理所有的物品,但订单时间可以改变何时处理一个物品。

不幸的是,我了解到,如果一个项目被 "拒绝 "一次->过滤条件评估为false,如果这个项目没有传递给另一个块(例如NullTarget),目标块不会重试相同的项目(也不会重新评估过滤器)。

public class ConsumeTest
  {
    private readonly BufferBlock<int> m_bufferBlock1;
    private readonly BufferBlock<int> m_bufferBlock2;
    private readonly ActionBlock<int> m_actionBlock;

    public ConsumeTest()
    {
      m_bufferBlock1 = new BufferBlock<int>();
      m_bufferBlock2 = new BufferBlock<int>();

      var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 };
      m_actionBlock = new ActionBlock<int>((item) => BlockAction(item), options);

      var start = DateTime.Now;
      var elapsed = TimeSpan.FromMinutes(1);

      m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed));
      m_bufferBlock2.LinkTo(m_actionBlock);

      FillBuffers();
    }

    private void BlockAction(int item)
    {
      Console.WriteLine(item);
      Thread.Sleep(2000);
    }

    private void FillBuffers()
    {
      for (int i = 0; i < 1000; i++)
      {
        if (i % 2 == 0)
        {
          m_bufferBlock1.Post(i);
        }
        else
        {
          m_bufferBlock2.Post(i);
        }
      }
    }

    private bool IsTimeElapsed(DateTime start, TimeSpan elapsed)
    {
      Console.WriteLine("checking time elapsed");
      return DateTime.Now > (start + elapsed);
    }

    public async Task Start()
    {
      await m_actionBlock.Completion;
    }
  }

代码设置了一个测试管道,并在两个缓冲区中填充奇数和偶数。两个BufferBlock都连接到一个单一的ActionBlock,该ActionBlock只打印 "已处理 "的数字,并等待2秒。

在m_bufferBlock1和m_actionBlock之间的过滤条件是检查(为了测试目的)自我们开始整个过程以来是否已经过了1分钟。

如果我们运行这个,它会产生以下输出。

1
checking time elapsed
3
5
7
9
11
13
15
17
19

如我们所见,ActionBlock从BufferBlock中提取第一个元素,没有过滤,然后尝试从BufferBlock中提取一个带过滤的元素。过滤器评估为false,它继续从没有过滤器的块中获取所有元素。

我的期望是,在处理完来自没有过滤器的BufferBlock的元素后,它再次尝试从另一个带过滤器的BufferBlock中取出元素,并再次评估它。

这将是我所期望的(或者说是想要的)结果。

1
checking time elapsed
3
checking time elapsed
5
checking time elapsed
7
checking time elapsed
9
checking time elapsed
11
checking time elapsed
13
checking time elapsed
15
// after timer has elapsed take elements also from other buffer
2
17
4
19

我现在的问题是,有没有办法 "重设 "已经被 "拒绝 "的消息,让它再次被评估,或者有其他方法通过不同的建模来实现?概括地说,他们真的从两个Buffers中严格交替拉出并不重要!(因为我知道这是调度的问题)。(因为我知道这取决于调度,而且如果从同一区块中的2个项目不时地被dequeued,那是完全没有问题的)但重要的是,"被拒绝 "的消息不能被丢弃或重新排队,因为一个缓冲区内的顺序是很重要的。

先谢谢你

c# task-parallel-library pipeline tpl-dataflow
1个回答
0
投票

一个想法是刷新两个区块之间的链接,周期性或按需刷新。实现一个周期性刷新的 LinkTo 并不是很困难。这里是一个实现。

public static IDisposable LinkTo<TOutput>(this ISourceBlock<TOutput> source,
    ITargetBlock<TOutput> target, Predicate<TOutput> predicate,
    TimeSpan refreshInterval, DataflowLinkOptions linkOptions = null)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (target == null) throw new ArgumentNullException(nameof(target));
    if (predicate == null) throw new ArgumentNullException(nameof(predicate));
    if (refreshInterval < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(refreshInterval));
    linkOptions = linkOptions ?? new DataflowLinkOptions();

    var locker = new object();
    var cts = new CancellationTokenSource();
    var token = cts.Token;
    var currentLink = source.LinkTo(target, linkOptions, predicate);
    var loopTask = Task.Run(async () =>
    {
        try
        {
            while (true)
            {
                await Task.Delay(refreshInterval, token).ConfigureAwait(false);
                currentLink.Dispose();
                currentLink = source.LinkTo(target, linkOptions, predicate);
            }
        }
        finally
        {
            lock (locker) { cts.Dispose(); cts = null; }
        }
    }, token);

    _ = Task.Factory.ContinueWhenAny(new[] { source.Completion, target.Completion },
        _ => { lock (locker) cts?.Cancel(); }, token, TaskContinuationOptions.None,
        TaskScheduler.Default);

    return new Unlinker(() =>
    {
        lock (locker) cts?.Cancel();
        // Wait synchronously the task to complete, ignoring cancellation exceptions.
        try { loopTask.GetAwaiter().GetResult(); } catch (OperationCanceledException) { }
        currentLink.Dispose();
    });
}

private struct Unlinker : IDisposable
{
    private readonly Action _action;
    public Unlinker(Action disposeAction) => _action = disposeAction;
    void IDisposable.Dispose() => _action?.Invoke();
}

用例:

m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed),
    refreshInterval: TimeSpan.FromSeconds(10));

在这两者之间的联系 m_bufferBlock1m_actionBlock 将每10秒刷新一次,直到两个区块中的一个完成。

© www.soinside.com 2019 - 2024. All rights reserved.