Observable.Create:CancellationToken 不会转换为 IsCancellationRequested

问题描述 投票:0回答:3

采用这个小脚本(在 LINQPad 中设计,但应该在任何地方运行):

void Main()
{
    Task.Run(() => Worker()).Wait();
}

async Task Worker()
{
    if (SynchronizationContext.Current != null)
        throw new InvalidOperationException("Don't want any synchronization!");

    BaseClass provider = new Implementation();
    Func<IObserver<TimeSpan>, CancellationToken, Task> subscribeAsync =
        provider.CreateValues;
    var observable = Observable.Create(subscribeAsync);

    var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s
    cancellation.Register(() => Console.WriteLine("token is cancelled now"));
    await observable
        .Do(ts =>
        {
            Console.WriteLine("Elapsed: {0}; cancelled: {1}",
                ts,
                cancellation.IsCancellationRequested);
            cancellation.ThrowIfCancellationRequested();
        })
        .ToTask(cancellation)
        .ConfigureAwait(false);
}

abstract class BaseClass
{
    // allow implementers to use async-await
    public abstract Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation);
}

class Implementation : BaseClass
{
    // creates Values for 10s; entirely CPU-bound: no way for async-await hence return Task.CompletedTask
    public override Task CreateValues(IObserver<TimeSpan> observer, CancellationToken cancellation)
    {
        try
        {
            var sw = Stopwatch.StartNew();
            for (int i = 0; i < 10; i++)
            {
                for (int j = 0; j < 3; j++)
                {
                    Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested);
                    Thread.Sleep(333);
                }

                if (cancellation.IsCancellationRequested) // !! never gets true !!
                    throw new ApplicationException("token is cancelled");

                observer.OnNext(sw.Elapsed);
            }

            return Task.CompletedTask;
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            throw;
        }
    }
}

方法

Implementation.CreateValues
只是持续运行整整10秒,而不是在5.5秒后停止。由
CancellationToken
传入的
Observable.Create
甚至不会转换为已取消状态(当然原始令牌会转换为已取消状态)!

这是一个错误吗?是我做错事了吗?

输出是:

0/0 cancelled:False
0/1 cancelled:False
0/2 cancelled:False
Elapsed: 00:00:01.0205951; cancelled: False
1/0 cancelled:False
1/1 cancelled:False
1/2 cancelled:False
Elapsed: 00:00:02.0253279; cancelled: False
2/0 cancelled:False
2/1 cancelled:False
2/2 cancelled:False
Elapsed: 00:00:03.0274035; cancelled: False
3/0 cancelled:False
3/1 cancelled:False
3/2 cancelled:False
Elapsed: 00:00:04.0294796; cancelled: False
4/0 cancelled:False
4/1 cancelled:False
4/2 cancelled:False
Elapsed: 00:00:05.0315332; cancelled: False
5/0 cancelled:False
5/1 cancelled:False
token is cancelled now
5/2 cancelled:False
Elapsed: 00:00:06.0335601; cancelled: True
6/0 cancelled:False
6/1 cancelled:False
6/2 cancelled:False
Elapsed: 00:00:07.0436211; cancelled: True
7/0 cancelled:False
7/1 cancelled:False
7/2 cancelled:False
Elapsed: 00:00:08.0457921; cancelled: True
8/0 cancelled:False
8/1 cancelled:False
8/2 cancelled:False
Elapsed: 00:00:09.0477509; cancelled: True
9/0 cancelled:False
9/1 cancelled:False
9/2 cancelled:False
Elapsed: 00:00:10.0498751; cancelled: True
[AggregateException] at Main/Task.Wait()
c# .net system.reactive
3个回答
6
投票

传递给

subscribeAsync
函数的取消标记是由
Observable.Create
调用实例化的,而不是您正在实例化的取消标记。

根据

Observable.Create
过载摘要:

从指定的可取消项创建可观察序列 异步订阅方法。 CancellationToken 传递给 异步 Subscribe 方法与返回的一次性方法相关联 订阅,允许尽力取消。

简而言之,取消令牌将在您处置订阅时取消,而不是在指定的延迟后取消。

您应该能够按如下方式重构代码以使其正常工作:

Observable.Create(observer => subscribeAsync(observer, cancellation));

希望有帮助。


1
投票

这并不是问题的真正答案,而是使用 System.Threading.Tasks.Dataflow 代替 System.Reactive 重写示例代码(代码太多,无法作为评论发布):

这有几个优点:

  1. 由于
    observer
    参数现在是
    Task
    ,每个实现都有一些
    await
    的用途。
  2. 如果需要,之前在
    Do()
    中的处理代码(现在在
    ActionBlock
    中)本身可以异步实现。
  3. 如果需要,可集成缓冲。
  4. 它是解耦的 = 与技术无关:我的界面是
    Func<TimeSpan, Task<bool>>
    ,因此不依赖于 Rx 或 TPL-Dataflow 或其他什么。

新代码:

void Main()
{
    Task.Run(() => Worker()).Wait();
    Console.WriteLine("DONE");
}

async Task Worker()
{
    if (SynchronizationContext.Current != null)
        throw new InvalidOperationException("Don't want any synchronization!");

    var cancellation = new CancellationTokenSource(55000).Token; // gets cancelled after 5.5s
    cancellation.Register(() => Console.WriteLine("token is cancelled now"));

    var flow = new ActionBlock<TimeSpan>(
        async ts =>
        {
            Console.WriteLine("[START] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested);
            await Task.Delay(2500).ConfigureAwait(false); // processing takes more time than items need to produce
            Console.WriteLine("[STOP] Elapsed: {0}; cancelled: {1}", ts, cancellation.IsCancellationRequested);
        },
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 2, // Buffer 1 item ahead
            EnsureOrdered = true,
            CancellationToken = cancellation,
        });

    Func<TimeSpan, Task<bool>> observer = ts => flow.SendAsync(ts, cancellation);

    BaseClass provider = new Implementation();
    await provider.CreateValues(observer, cancellation).ConfigureAwait(false);
    Console.WriteLine("provider.CreateValues done");

    flow.Complete();
    await flow.Completion.ConfigureAwait(false);
    Console.WriteLine("flow completed");
}

abstract class BaseClass
{
    // allow implementers to use async-await
    public abstract Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation);
}

class Implementation : BaseClass
{
    public override async Task CreateValues(Func<TimeSpan, Task<bool>> observer, CancellationToken cancellation)
    {
        try
        {
            var sw = Stopwatch.StartNew();
            for (int i = 0; i < 10; i++)
            {
                for (int j = 0; j < 3; j++)
                {
                    Console.WriteLine("{0}/{1} cancelled:{2}", i, j, cancellation.IsCancellationRequested);
                    Thread.Sleep(333);
                }

                if (cancellation.IsCancellationRequested)
                    throw new ApplicationException("token is cancelled");

                var value = sw.Elapsed;
                var queued = await observer(value); // use of "observer" encorces async-await even if there is nothing else async
                Console.WriteLine("[{0}] '{1}' @ {2}", queued ? "enqueued" : "skipped", value, sw.Elapsed);

                if (!queued)
                    ; // Dispose item
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
            throw;
        }
    }
}

0
投票

这是我会这样做的方式:

void Main()
{
    Task.Run(() => Worker()).Wait();
}

async Task Worker()
{
    if (SynchronizationContext.Current != null)
        throw new InvalidOperationException("Don't want any synchronization!");

    var cancellation = new CancellationTokenSource(5500).Token; // gets cancelled after 5.5s

    cancellation.Register(() => Console.WriteLine("token is cancelled now"));

    var until = Observable.Create<Unit>(o => cancellation.Register(() =>
    {
        o.OnNext(Unit.Default);
        o.OnCompleted();
    }));

    var observable =
    (
        Observable
            .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(333.0))
            .Zip(
                from i in Observable.Range(0, 10)
                from j in Observable.Range(0, 3)
                select $"{i}/{j} cancelled:{cancellation.IsCancellationRequested}",
                (_, x) => x)
    ).TakeUntil(until);
        
    await observable.Do(x => Console.WriteLine(x));
}

关键是

until
可观察的,它基本上监视取消并产生
Unit
然后完成。它允许使用普通的反应式运算符。

© www.soinside.com 2019 - 2024. All rights reserved.