如果可观察量生成太多值太快,有没有办法取消和替换它?

问题描述 投票:0回答:1

我有一个以可变速度生成值的 Observable。为了不被值淹没,我添加了三秒的 Throttle,因此只有在三秒内没有发布更多值的情况下我才会获得一个值。但我想要的是,如果我在一段时间内获得一定数量的更新,则结束流并将其替换为另一个可观察的。

例如,如果我在三秒内收到 50 个更新,则结束流并将其替换为不同的流,类似于 Catch 如何将因异常终止的可观察对象替换为另一个可观察对象。

类似于下面的内容,但没有抛出异常,因此不能使用 Catch:

myObservable
   .Throttle(TimeSpan.FromSeconds(3)) //Not sure if we need to remove Throttle
   .Catch<long, Exception>(e => Observable.Return(long)0)  //Instead of catching an exception, some way to monitor how many updates are coming in before throttling
   .Subscribe

编辑: 我添加了一个弹珠图来尝试展示我正在寻找的内容。

初始可观察值以可变速率产生值。值 1-6 进来,3 秒内突发 50 个值都没有,这些值传递到节流阀,产生最终值 1、5 和 6。

然后,初始可观察值在 3 秒内生成值 7-60。这是我想做的事情“???”正在显示。这个想法是为了认识到在设定的时间范围内生产了 50 件或更多的物品,完成原始的 obs。并将其替换为我提供的,类似于您提供 obs 的方式。 Catch 中的序列来替换出错的序列(例如,如果我看到原始序列产生了巨大的突发并引发了异常)。

初始 obs 之后。被替换后,序列将继续使用新的序列,生产的物品将通过现有的节流阀。

如果在“???”中检查的时间范围内只有 49 个项目,这些值将全部传递到 Throttle,并且只会生成最后一个。如果根本没有更新,那么什么也不会发生,也不会产生任何输出。

希望我现在问的问题更清楚了。

c# observable reactive-programming system.reactive
1个回答
0
投票

您可以使用

Scan()
构建最后 50 个项目的滑动窗口(无法使其与
Buffer()
Window()
一起使用,但我想这是可能的)。每个项目都带有时间戳。然后,对于每个滑动窗口,检查第一个和最后一个时间戳并检查它们是否太接近。如果是这种情况,您可以切换到另一个可观察的。

要使用时间戳“丰富”值,您只需使用

Select()
:

IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
{
    return Tuple.Create(it, DateTime.UtcNow);
});

然后使用

Scan()
构建滑动窗口:

IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
(acc, it) =>
{
    while (acc.Count >= countLimit)
    {
        acc.RemoveAt(0);
    }
    acc.Add(it);
    return acc;
});

现在您的“值”是一个元组列表,其中每个条目都有值及其发出的时间戳。当开始和结束时间戳太接近时,我们使用

TakeWhile()
停止发出值:

IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it => 
{
    if (it.Count < countLimit)
    {
        return true;
    }
    DateTime firstTime = it.First().Item2;
    DateTime lastTime = it.Last().Item2;
    TimeSpan timeDiff = lastTime - firstTime;
    if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds)) 
    {
        return false;
    }
    return true;
});

出于调试目的,我们将打印第一个和最后一个条目的值:

IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
{
    Console.WriteLine("Count: "+it.Count);
    if (it.Count > 0) 
    {
        DateTime firstTime = it.First().Item2;
        DateTime lastTime = it.Last().Item2;
        TimeSpan timeDiff = lastTime - firstTime;
        Console.WriteLine("Timediff is: "+timeDiff);
    } 
});

然后我们再次“提取”原始值:

IObservable<int> onlyValueOfLastItem = withDebugging.Select(it => it.Last().Item1);

现在我们有一个流,当时间戳太接近时,它会“死亡”。我们可以使用简单的

Concat()
(或
Switch()
)切换到另一个可观察值:

IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));

这是完整的源代码:

static void Main(string[] args)
{
    ISubject<int> source = new Subject<int>();

    IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
    {
        return Tuple.Create(it, DateTime.UtcNow);
    });

    int countLimit = 50;
    int timeLimitInSeconds = 3;

    IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
    (acc, it) =>
    {
        while (acc.Count >= countLimit)
        {
            acc.RemoveAt(0);
        }
        acc.Add(it);
        return acc;
    });

    IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it => 
    {
        if (it.Count < countLimit)
        {
            return true;
        }
        DateTime firstTime = it.First().Item2;
        DateTime lastTime = it.Last().Item2;
        TimeSpan timeDiff = lastTime - firstTime;
        if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds)) 
        {
            return false;
        }
        return true;
    });

    IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
    {
        Console.WriteLine("Count: "+it.Count);
        if (it.Count > 0) 
        {
            DateTime firstTime = it.First().Item2;
            DateTime lastTime = it.Last().Item2;
            TimeSpan timeDiff = lastTime - firstTime;
            Console.WriteLine("Timediff is: "+timeDiff);
        } 
    });

    IObservable<int> onlyValueOfLastItem = withDebugging
        .Select(it => it.Last().Item1);

    IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));

    Console.WriteLine("Subscribe start");
    concatSource.Subscribe(it =>
    {
        Console.WriteLine(it);
    });
    
    Thread t = new Thread(() =>
    {
        int maxDelay = 300;
        int counter = 1;
        while (maxDelay > 0)
        {
            Console.WriteLine("maxDelay is: "+maxDelay);
            source.OnNext(counter++);
            int sleepAmount = Random.Shared.Next(1, maxDelay);
            maxDelay--;
            Thread.Sleep(sleepAmount);
        }
    });
    t.Start();

    t.Join();

    Console.WriteLine("Program ends");
}

这可以生成如下输出:

              [...]
maxDelay is: 111
Count: 50
Timediff is: 00:00:03.0516947
190
maxDelay is: 110
Count: 50
Timediff is: 00:00:03.0656792
191
maxDelay is: 109
Count: 50
Timediff is: 00:00:03.0892908
192
maxDelay is: 108
Count: 50
Timediff is: 00:00:03.1163132
193
maxDelay is: 107
Count: 50
Timediff is: 00:00:03.1003305
194
maxDelay is: 106
Count: 50
Timediff is: 00:00:03.0078090
195
maxDelay is: 105
-1
              [...]

您仍然需要在正确的位置再次读取

Throttle()
调用。

© www.soinside.com 2019 - 2024. All rights reserved.