[我正在尝试在我的Xamarin移动应用中引入Rx,我想在应用启动时的登录阶段将一系列呼叫链接在一起。
TL; DR;如何一个接一个地运行2/3个可观察对象,以检索数据并正确设置线程。
更详细地说,我执行以下操作:
LoginUser() : IObservable<User>
RetrieveRemoteA() : IObservable<A>
或RetrieveRemoteB() : IObservable<B>
A
或B
,我将更新UI。这是一种描述流程的图(如上所述):
考虑到我要避免从上一个的Subscribe()
内调用新的可观察对象(源),这是我实现的用于检索数据和更新UI的内容(图像中的流已在下面的代码)。
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
(user) =>
{
if (user.Type == UserType.A)
return RetrieveRemoteA(user.UserId); // outputs IObservable<A>
return Observable.Return(new A());
},
(user, a) =>
{
B b = null;
return new { user, a, b }; // Create anonymous type to keet track of 'user'
})
.SelectMany(
(xType) =>
{
if (xType.user.Type == UserType.B)
return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>
return Observable.Return(new B());
},
(xType, bData) =>
{
var user = xType.user;
var a = xType.a;
var b = b;
return new { user, a, b };
})
.ObserveOn(ImmediateScheduler.Instance)
.Select((xType) =>
{
if (xType.user.Type == UserType.A)
{
A a = xType.a;
B b = null;
return new { a, b };
}
else {
A a = null;
B b = xType.b;
return new { a, b };
}
})
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.a.Id}");
}
else {
Console.WriteLine($"ID: {result.b.Id}");
}
});
一旦运行,即使方法完成没有错误,似乎流也会堆积在RetrieveRemoteA(user.UserId)
中。
public IObservable<A> RetrieveRemoteA(string userId)
{
return Observable.FromAsync<A>(async () =>
{
A a = await CustomAPI(userId)
return a;
}
}
[当我通过调用上一个订阅中的每个新可观察对象来顺序实现该流程时,它可以正常工作(但这不是正确的方法)。>>
我认为这是线程问题,或者是我的Rx实现错误。
请问您有什么线索吗?
我正在尝试在我的Xamarin移动应用程序中引入Rx,我想在应用程序启动的登录阶段链接一系列的呼叫。 TL; DR;如何依次运行2/3个可观察对象...
我认为您可以将其折叠为以下内容。 Reactive.Extensions可以将异步方法视为可观察到的单个返回,因此无需使用RetrieveRemoteA
方法包装异步调用。我们可以利用SelectMany
中的行为,将其调出到API并检索您的用户信息,并等待响应,然后再通知下游。