我有一个异步方法,比如说。
public async Task<T> GetAsync()
{
}
并将被调用从:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}
上面的语法是无效的,但基本上我是在寻找异步生成器。我知道它可以通过Observables来处理。我确实用Rx.NET做了实验,在一定程度上是可行的。但我试图避免它给代码库带来的复杂性,更重要的是上述需求本质上仍然不是一个反应式系统(我们的系统仍然是基于拉的)。比如,我只会在一定时间内监听传入的async流,而且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以这样反转方法签名。
public IEnumerable<Task<T>> GetAllAsync()
但这样做LINQ操作就会变得很麻烦,因为没有阻塞。我希望它是无阻塞的,以及不把整个事情加载到内存中。这个库。AsyncEnumerable 正是我所要找的,但是如何能用 Ix.NET? 我相信它们的目的是一样的。
换句话说,我如何利用Ix.NET来生成一个 IAsyncEnumerable
在处理 await
? 喜欢。
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}
(已编辑)
使用NuGet的System.Linq.Async 4.0.0,现在你可以使用 SelectAwait
.
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something
.ToAsyncEnumerable()
.SelectAwait(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
(过时)
使用NuGet的System.Interactive.Async 3.2.0,怎么样?目前 Select()
不支持async lambda,你必须自己实现它。
更好地支持异步--AsyncEnumerable的基于任务的重载。
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something.SelectAsync(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var enumerator = enumerable.GetEnumerator();
var current = default(TResult);
return AsyncEnumerable.CreateEnumerator(async c =>
{
var moveNext = enumerator.MoveNext();
current = moveNext
? await selector(enumerator.Current).ConfigureAwait(false)
: default(TResult);
return moveNext;
},
() => current,
() => enumerator.Dispose());
});
}
}
扩展方法引自本示例。https:/github.commaca88AsyncGeneratorissues94#issuecomment-385286972。