我正在寻找类似于
exhaustMap
的rxjs
运算符的东西,但是RX.NET
似乎没有这样的运算符。
我需要实现的是,在源流的每个元素上,我需要启动一个
async
处理程序,直到它完成,我想从源中删除任何元素。处理程序完成后,立即恢复获取元素。
我不想要的是在每个元素上启动一个异步处理程序——当处理程序运行时,我想删除源元素。
我还怀疑我需要在这里巧妙地使用 defer 运算符?
谢谢!
ExhaustMap
运算符的实现。源 observable 被投影到 IObservable<(Task<TResult>, int)>
,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项目关联的新任务。然后使用 DistinctUntilChanged
运算符删除重复出现的相同任务,最后使用 Concat
运算符将可观察对象展平。
/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan((Task: Task.FromResult<TResult>(default), Id: 0), (previous, item) =>
!previous.Task.IsCompleted ? previous : (function(item), unchecked(previous.Id + 1)))
.DistinctUntilChanged()
.Select(e => e.Task)
.Concat();
}
function
返回的任务不保证是不同的。例如,方法 async Task<T> Return<T>(T result) => result;
对于 Task
或 result = 1
始终返回相同的 result = false
。因此,在上述实现中需要增加 Id
,使任务个性化,这样 DistinctUntilChanged
就不会从单独的 function
调用中过滤掉任务。
用法示例:
Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Produced #{x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"--Result: {x}"))
.Wait();
输出:
Produced #1
--Result: 1
Produced #2
--Result: 2
Produced #3
Produced #4
Produced #5
--Result: 3
Produced #6
Produced #7
Produced #8
--Result: 6
Produced #9
Produced #10
在线演示.
这里是
ExhaustMap
的替代实现,其中 function
产生 IObservable<TResult>
而不是 Task<TResult>
:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}