IAsyncEnumerable 转换为 IObservable(和任务)未启动

问题描述 投票:0回答:1

我的

BackgroundService
中有以下代码:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    await GetElements(cancellationToken)
        .ToObservable()
        .Buffer(TimeSpan.FromMinutes(1), 100)
        .Select(elements => Observable.FromAsync(() => ProcessElementsAsync(
            elements, cancellationToken)))
        .Concat()
        .ToTask(cancellationToken);
}

private async IAsyncEnumerable<Element> GetElements(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)  // <-- Breakpoint is here
    {
        yield return await _queue.DequeueElementAsync(cancellationToken);
    }
}

当我运行应用程序时,永远不会命中断点。看起来好像

GetElements
从未真正执行过。

当我用此代码替换

ExecuteAsync
时,
GetElements
中的断点确实被击中:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    GetElements(cancellationToken)
        .ToObservable()
        .Subscribe(val =>
        {
            _ = 1;
        });

    await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}

第一次实施有什么问题?

c# observable system.reactive rx.net iasyncenumerable
1个回答
0
投票

这可能与您的阅读方式有关

GetElements
。尝试类似的东西
await foreach (var element in GetElements())

我正在运行单元测试并尝试达到断点。

    public async IAsyncEnumerable<bool> SomeFunc()
    {
        yield return true;
    }

    public async Task SomeFunc2()
    {
        await AwaitableFunc();
        return;
    }

如果我这样调用的话,我只能在 SomeFunc 中命中断点

await foreach (var item in SomeFunc()){}

这样调用不会断点

SomeFunc()

IAsyncEnumerable 函数的代码不会在调用该方法后立即执行。相反,它仅在您开始迭代可枚举时才开始执行。

© www.soinside.com 2019 - 2024. All rights reserved.