为了示例,假设我决定从以下内容开始:
var startingSequence = await GetLastSequence(database);
return Observable.Create<ExternalSourceChange>(async (observer) =>
{
var currentSequence = startingSequence;
var done = false;
do
{
var changes = await CreateRequest()
.AppendPathSegment(database)
.AppendPathSegment("_changes")
.SetQueryParams(new
{
since = currentSequence,
include_docs = "true",
limit = 1,
})
.GetJsonAsync<ExternalSourceChangeJson>();
// note: If we've already processed past this sequence, skip it.
if (string.CompareOrdinal(changes.last_seq, currentSequence) <= 0)
{
continue;
}
var result = changes.results.First();
var deleted = result.doc.GetValue("_deleted")?.Value<bool>() ?? false;
var databaseChange = new ExternalSourceChangeJson(result.id, result.doc.ToString())
{
Deleted = deleted,
};
observer.OnNext(databaseChange);
currentSequence = ((List<ExternalSourceChangeListJson>)changes
.results)
.OrderBy((result) => result.seq)
.LastOrDefault()?.seq
?? currentSequence;
done = changes.pending == 0;
} while (! done);
await SetLastSequence(database, currentSequence);
observer.OnCompleted();
// note: Not sure I have anything to do here.
return () => {};
});
上面有什么我可以做的事情,可以利用它来利用更多的system.reactive
?
Changes<dynamic>
在每次迭代中产生currentSequence
和done
到目前为止是我觉得可以做得更好的两件事使用Observable.Create
不是这里的方法。如果您发现自己返回了return () => {};
(或您经常看到的return Disposable.Empty;
),则说明您做错了。
几乎总是有一种使用内置运算符来获得良好的弹性查询的方法。
现在,由于查询中发生了很多事情,我无法轻易复制,因此,我将为您做一个简单的示例,说明您将如何做自己想要的事情。
首先,这是我从数据库中获取某些东西的极大简化:
private static int __counter = 0;
public Task<int> GetCounterAsync() => Task.Run(() => __counter++);
现在,我将根据您的代码编写错误的方法,以便您可以看到我的最终代码将如何与您的代码联系起来。我的目标是从“数据库”中检索值,直到得到10
并完成该值。
IObservable<int> query =
Observable.Create<int>(async observer =>
{
var done = false;
do
{
var counter = await GetCounterAsync();
observer.OnNext(counter);
done = counter == 10;
} while (!done);
observer.OnCompleted();
return () => { };
});
这是为了看起来像您的代码。我想强调,这是不是这样做的方法。
这是正确的方法。
IObservable<int> query =
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == 10);
Observable.Defer
很重要,因为它每次触发Observable.FromAsync(() => GetCounterAsync())
运算符时都会导致重新调用.Repeat()
。没有它,第一次调用Observable.FromAsync(() => GetCounterAsync())
的结果将无限期重复。
现在,如果您需要在查询中包括状态,这通常是人们使用Observable.Create
的原因,那么您始终可以将整个内容包装在另一个Observable.Defer
中。
IObservable<int> query =
Observable
.Defer(() =>
{
var finish = 10;
return
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == finish);
});
通过这种方式,将为每个订户再次创建查询中所需的任何状态。
如果您接下来需要对状态做任何事情,则可以执行此操作:
IObservable<int> query =
Observable
.Defer(() =>
{
var finish = 10;
return
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == finish)
.Finally(() => Console.WriteLine($"Finished at {finish}"));
});
query.Subscribe(x => Console.WriteLine(x));
产生:
01个234567891010完成
让我知道这是否有帮助或您的问题中是否缺少我。
在这里,您可以通过包括错误处理和对取消订阅的反应来使它更强大。
return Observable.Create<ExternalSourceChange>(observer =>
{
var cts = new CancellationTokenSource();
var fireAndForget = Task.Run(async () =>
{
try
{
while (true)
{
cts.Token.ThrowIfCancellationRequested();
var changes = await CreateRequest().GetJsonAsync();
//...
observer.OnNext(databaseChange);
//...
if (changes.pending == 0) break;
}
observer.OnCompleted();
}
catch (OperationCanceledException ex) when (ex.CancellationToken == cts.Token)
{
// Do nothing (the subscriber unsubscribed)
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
return System.Reactive.Disposables.Disposable.Create(() =>
{
cts.Cancel();
});
});
您可以将cts.Token
传递给您调用的任何接受取消令牌的异步方法,以更快地终止循环。
Update:我刚刚注意到Observable.Create
方法的另一个重载,该方法接受带有CancellationToken
参数的异步委托:
// Summary:
// Creates an observable sequence from a specified cancellable asynchronous Subscribe
// method. The CancellationToken passed to the asynchronous Subscribe method is
// tied to the returned disposable subscription, allowing best-effort cancellation.
public static IObservable<TResult> Create<TResult>(
Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync);
这可以用来简化事情,方法是取消即发即弃任务,最后删除CancellationTokenSource
和Disposable.Create
。
return Observable.Create<ExternalSourceChange>(async (observer, cancellationToken) =>
{ // etc...