我想知道每次Observable是否可以启动任务,并使用这些任务的结果继续管道。结果的顺序无关紧要。我正在使用C#。
linksObservable
.Select(url=> downloadTask(url))
...
上面的代码将启动下载URL的任务但是如何获得结果,因为它们可用(即downloadTask已完成)。另一个考虑因素是来自任务的Exceptions
。任何异常都不应影响其余任务。
好吧,如果downloadTask
返回你可以做的事情
observable
.SelectMany(url => downloadTask(url))
.Subscribe(result => Console.WriteLine(result));
完整的例子:
void Main()
{
var observable = new Subject<string>();
observable
.SelectMany(url => downloadTask(url))
.Subscribe(result => Console.WriteLine(result));
observable.OnNext("a");
observable.OnNext("b");
observable.OnNext("c");
observable.OnNext("d");
}
public async Task<string> downloadTask(string s)
{
await Task.Delay(1000);
return s;
}