C# RX - GroupBy 仅第一次运行

问题描述 投票:0回答:1

我有执行以下操作的 rx 管道 -

  1. 每 10 秒运行一次。
  2. 使用扫描生成周期(1 分钟增量)。
  3. 获取当前时间段的数据。
  4. 将它们投影为 IEvent 类型的对象流。
  5. 然后组基于 IEvent.Channel。
  6. 打印频道。

最后一步仅运行一次。我期待无限奔跑。 我在这里缺少什么?

var observable = Observable
    .Interval(TimeSpan.FromSeconds(10))                
    .Scan(seed, NextPeriod)                
    .SelectMany(period =>
    {
        Console.WriteLine();
        Console.WriteLine("fetching events for");
        Console.WriteLine(period.Begin.ToString("yyyy-MM-dd hh:mm:ss.ffff"));
        Console.WriteLine(period.End.ToString("yyyy-MM-dd hh:mm:ss.ffff"));

        var events = FetchEvents(period, eventDataProvider);
        Console.WriteLine($"{events.Count} events fetched");

        return events;
    })
    .GroupBy(x => x.Channel).Subscribe(group =>
    {
        Console.WriteLine($"Channel - {group.Key}");
    });

控制台输出

events generated

fetching events for
2024-05-15 03:16:00.0000
2024-05-15 03:16:59.0000
40 events fetched
Channel - group-1
Channel - group-2
Channel - group-3

fetching events for
2024-05-15 03:17:00.0000
2024-05-15 03:17:59.0000
1475 events fetched

fetching events for
2024-05-15 03:18:00.0000
2024-05-15 03:18:59.0000
1475 events fetched

c# reactive-programming system.reactive
1个回答
0
投票

这是您正在做的事情的独立版本:

IDisposable subscription =
    Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Take(3)
        .SelectMany(x => new[]
        {
            new { Channel = "a", Value = x, },
            new { Channel = "b", Value = x, },
            new { Channel = "c", Value = x, },
        })
        .Do(x => Console.WriteLine($"{x.Channel}, {x.Value}"))
        .GroupBy(x => x.Channel)
        .Subscribe(group =>
        {
            Console.WriteLine($"Channel - {group.Key}");
        });

SelectMany
正在产生这个价值流:

a, 0
b, 0
c, 0
a, 1
b, 1
c, 1
a, 2
b, 2
c, 2

现在

GroupBy
Channel
分组并生成
IObservable<IGroupedObservable<...>>
。这是一个嵌套的可观察对象。非常有效
IObservable<IObservable<...>>

第一次看到每个通道时,它会订阅 outer 可观察对象,并输出通道的键 ,用于它使用该键看到的第一个可观察对象。你没有对内部可观察值做任何事情 - 所以内部可观察值返回一个值多少次并不重要 - 你只是还没有订阅它。

© www.soinside.com 2019 - 2024. All rights reserved.