有没有Ix.NET(System.Interactive)的例子?

问题描述 投票:6回答:1

我有一个异步方法,比如说。

public async Task<T> GetAsync()
{

}

并将被调用从:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

上面的语法是无效的,但基本上我是在寻找异步生成器。我知道它可以通过Observables来处理。我确实用Rx.NET做了实验,在一定程度上是可行的。但我试图避免它给代码库带来的复杂性,更重要的是上述需求本质上仍然不是一个反应式系统(我们的系统仍然是基于拉的)。比如,我只会在一定时间内监听传入的async流,而且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。

我可以这样反转方法签名。

public IEnumerable<Task<T>> GetAllAsync()

但这样做LINQ操作就会变得很麻烦,因为没有阻塞。我希望它是无阻塞的,以及不把整个事情加载到内存中。这个库。AsyncEnumerable 正是我所要找的,但是如何能用 Ix.NET? 我相信它们的目的是一样的。

换句话说,我如何利用Ix.NET来生成一个 IAsyncEnumerable 在处理 await? 喜欢。

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}
c# linq lazy-evaluation rx.net system.interactive
1个回答
5
投票

(已编辑)

使用NuGet的System.Linq.Async 4.0.0,现在你可以使用 SelectAwait.

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(过时)

使用NuGet的System.Interactive.Async 3.2.0,怎么样?目前 Select() 不支持async lambda,你必须自己实现它。

更好地支持异步--AsyncEnumerable的基于任务的重载。

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

扩展方法引自本示例。https:/github.commaca88AsyncGeneratorissues94#issuecomment-385286972。

© www.soinside.com 2019 - 2024. All rights reserved.