Reactive-如何结合/合并/查找具有两个序列的项目

问题描述 投票:0回答:1

我正在连接到一个网络服务,该服务为我提供一天的所有价格(没有时间信息)。每个价格结果都有对应的“批量运行”的ID。

“批生产”具有日期和时间戳,但我必须单独致电以获取当天的所有批生产信息。

因此,要获得每个结果的实际时间,我需要将两个API调用结合在一起。

我为此使用了Reactive,但是我无法可靠地合并两组数据。我以为CombineLatest可以做到这一点,但是它似乎并没有按照我的想法工作(基于http://reactivex.io/documentation/operators/combinelatest.htmlhttp://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest)。

    [TestMethod]
    public async Task EvenMoreBasicCombineLatestTest()
    {
        int batchStart = 100, batchCount = 10;

        //create 10 results with batch ids [100, 109]
        //the test uses lists just to make debugging easier
        var resultsWithBatchIdList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new { BatchRunId = id, ResultValue = id * 10 })
            .ToList();
        var resultsWithBatchId = Observable.ToObservable(resultsWithBatchIdList);
        Assert.AreEqual(batchCount, await resultsWithBatchId.Count());

        //create 10 batches with ids [100, 109]
        var batchesList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new
            {
                ThisId = id,
                BatchName = String.Concat("abcd", id)
            })
            .ToList();
        var batchesObservable = Observable.ToObservable(batchesList);
        Assert.AreEqual(batchCount, await batchesObservable.Count());

        //turn the batch set into a dictionary so we can look up each batch by its id
        var batchRunsByIdObservable = batchesObservable.ToDictionary(batch => batch.ThisId);

        //for each result, look up the corresponding batch id in the dictionary to join them together
        var resultsWithCorrespondingBatch =
            batchRunsByIdObservable
            .CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
            {
                Assert.AreEqual(NumberOfResultsToCreate, batchRunsById.Count);

                var correspondingBatch = batchRunsById[result.BatchRunId];

                var priceResultAndSourceBatch = new 
                {
                    Result = result,
                    SourceBatchRun = correspondingBatch
                };
                return priceResultAndSourceBatch;
            });
        Assert.AreEqual(batchCount, await resultsWithCorrespondingBatch.Count());
    }

我希望随着'results'可观察结果的每个元素的出现,它将与可观察到的batch-id词典的每个元素结合在一起(后者只有一个元素)。但是,相反,似乎只有结果列表的最后一个元素才被加入。

我由此产生了一个更复杂的问题,但是在尝试创建最小再现时,即使这样做也会给我带来意想不到的结果。版本3.1.1、4.0.0、4.2.0等会发生这种情况。

((请注意,这些序列通常不像此人工示例中那样匹配,因此我不能仅将它们Zip匹配。)

所以我该如何加入?我想通过Dictionary查找更多信息的结果流(也来自Observable)?

还要注意,目标是返回IObservable(resultsWithCorrespondingBatch),所以我不能只是await batchRunsByIdObservable。

c# system.reactive
1个回答
0
投票

好吧,我想我明白了。我希望文档中的两个大理石图中的任何一个都稍有不同-会使CombineLatest的细微之处更加明显:

N------1---2---3---
L--z--a------bc----

R------1---2-223---
       a   a bcc 

它是结合在一起的[[最新”-因此,根据发出项目的时间,可能会丢失一些元组。我应该做的是SelectMany

否:.CombineLatest(resultsWithBatchId, (batchRunsById, result) =>是:.SelectMany(batchRunsById => resultsWithBatchId.Select(result =>
© www.soinside.com 2019 - 2024. All rights reserved.