我如何使用system.reactive在每次迭代中使用值的情况下执行do?

问题描述 投票:0回答:2

为了示例,假设我决定从以下内容开始:

var startingSequence = await GetLastSequence(database);

return Observable.Create<ExternalSourceChange>(async (observer) =>
{
    var currentSequence = startingSequence;

    var done = false;
    do
    {
        var changes = await CreateRequest()
            .AppendPathSegment(database)
            .AppendPathSegment("_changes")
            .SetQueryParams(new
            {
                since = currentSequence,
                include_docs = "true",
                limit = 1,
            })
            .GetJsonAsync<ExternalSourceChangeJson>();

        // note: If we've already processed past this sequence, skip it.
        if (string.CompareOrdinal(changes.last_seq, currentSequence) <= 0)
        {
            continue;
        }

        var result = changes.results.First();
        var deleted = result.doc.GetValue("_deleted")?.Value<bool>() ?? false;

        var databaseChange = new ExternalSourceChangeJson(result.id, result.doc.ToString())
        {
            Deleted = deleted,
        };

        observer.OnNext(databaseChange);

        currentSequence = ((List<ExternalSourceChangeListJson>)changes
            .results)
            .OrderBy((result) => result.seq)
            .LastOrDefault()?.seq
            ?? currentSequence;

        done = changes.pending == 0;

    } while (! done);

    await SetLastSequence(database, currentSequence);

    observer.OnCompleted();

    // note: Not sure I have anything to do here.
    return () => {};
});

上面有什么我可以做的事情,可以利用它来利用更多的system.reactive

  • Changes<dynamic>在每次迭代中产生
  • [currentSequencedone到目前为止是我觉得可以做得更好的两件事
c# system.reactive
2个回答
2
投票

使用Observable.Create不是这里的方法。如果您发现自己返回了return () => {};(或您经常看到的return Disposable.Empty;),则说明您做错了。

几乎总是有一种使用内置运算符来获得良好的弹性查询的方法。

现在,由于查询中发生了很多事情,我无法轻易复制,因此,我将为您做一个简单的示例,说明您将如何做自己想要的事情。

首先,这是我从数据库中获取某些东西的极大简化:

private static int __counter = 0;
public Task<int> GetCounterAsync() => Task.Run(() => __counter++);

现在,我将根据您的代码编写错误的方法,以便您可以看到我的最终代码将如何与您的代码联系起来。我的目标是从“数据库”中检索值,直到得到10并完成该值。

IObservable<int> query =
    Observable.Create<int>(async observer =>
    {
        var done = false;
        do
        {
            var counter = await GetCounterAsync();
            observer.OnNext(counter);
            done = counter == 10;
        } while (!done);
        observer.OnCompleted();
        return () => { };
    });

这是为了看起来像您的代码。我想强调,这是不是这样做的方法。

这是正确的方法。

IObservable<int> query =
    Observable
        .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
        .Repeat()
        .TakeUntil(x => x == 10);

Observable.Defer很重要,因为它每次触发Observable.FromAsync(() => GetCounterAsync())运算符时都会导致重新调用.Repeat()。没有它,第一次调用Observable.FromAsync(() => GetCounterAsync())的结果将无限期重复。

现在,如果您需要在查询中包括状态,这通常是人们使用Observable.Create的原因,那么您始终可以将整个内容包装在另一个Observable.Defer中。

IObservable<int> query =
    Observable
        .Defer(() =>
        {
            var finish = 10;
            return
                Observable
                    .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
                    .Repeat()
                    .TakeUntil(x => x == finish);
        });

通过这种方式,将为每个订户再次创建查询中所需的任何状态。

如果您接下来需要对状态做任何事情,则可以执行此操作:

IObservable<int> query =
    Observable
        .Defer(() =>
        {
            var finish = 10;
            return
                Observable
                    .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
                    .Repeat()
                    .TakeUntil(x => x == finish)
                    .Finally(() => Console.WriteLine($"Finished at {finish}"));
        });

query.Subscribe(x => Console.WriteLine(x));

产生:

01个234567891010完成

让我知道这是否有帮助或您的问题中是否缺少我。


1
投票

在这里,您可以通过包括错误处理和对取消订阅的反应来使它更强大。

return Observable.Create<ExternalSourceChange>(observer =>
{
    var cts = new CancellationTokenSource();
    var fireAndForget = Task.Run(async () =>
    {
        try
        {
            while (true)
            {
                cts.Token.ThrowIfCancellationRequested();
                var changes = await CreateRequest().GetJsonAsync();
                //...
                observer.OnNext(databaseChange);
                //...
                if (changes.pending == 0) break;
            }
            observer.OnCompleted();
        }
        catch (OperationCanceledException ex) when (ex.CancellationToken == cts.Token)
        {
            // Do nothing (the subscriber unsubscribed)
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
    });

    return System.Reactive.Disposables.Disposable.Create(() =>
    {
        cts.Cancel();
    });
});

您可以将cts.Token传递给您调用的任何接受取消令牌的异步方法,以更快地终止循环。


Update:我刚刚注意到Observable.Create方法的另一个重载,该方法接受带有CancellationToken参数的异步委托:

// Summary:
// Creates an observable sequence from a specified cancellable asynchronous Subscribe
// method. The CancellationToken passed to the asynchronous Subscribe method is
// tied to the returned disposable subscription, allowing best-effort cancellation.
public static IObservable<TResult> Create<TResult>(
    Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync);

这可以用来简化事情,方法是取消即发即弃任务,最后删除CancellationTokenSourceDisposable.Create

return Observable.Create<ExternalSourceChange>(async (observer, cancellationToken) =>
{ // etc...
© www.soinside.com 2019 - 2024. All rights reserved.