完全可观测序列“长”的时间间隔后

问题描述 投票:2回答:2

以下观察到的序列将每个元件到一个ReplaySubject以便以后可以访问的任何元件,甚至等待ReplaySubject的完成。它完成了一个时间跨度已经达到后ReaplySubject。

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

我想顺序运行,直到它已自上次“OnNext”呼叫,然后完成给定的时间。这将是在各种蓝牙通信场景非常有用,蓝牙设备会给我的数据的序列,然后就停止,没有任何形式完成消息或事件。在这些情况下,我需要试探性地确定序时完成,然后完成它自己。所以,如果它是“太长”自从上次蓝牙通​​知我想完成ReplaySubject。

我可以通过创建一个定时器,每收到一个元素时将其复位,然后完成ReplaySubject当计时器达到“太长”这样做,但我听说,创建对象和可观察到的内部认购从操纵它不是线程安全的。

在“太长”的时间间隔后,我如何能够完成一个序列什么建议吗?

这里是不是线程从我所听到的安全的,但应该按预期工作的一个版本:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

这似乎是满意的基础上Simonare的第一个答案:

                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });
c# asynchronous system.reactive
2个回答
0
投票

你可以考虑使用Timeout Operator 。这样做的唯一的缺点是它错误信号终止。您可能需要处理虚假错误

超时运营商允许您在可观察的失败,在指定的时间跨度内发出的任何项目中止可观察到的与终止的onError。

如果你使用的方法下面你可以超越错误

 .Timeout(200, Promise.resolve(42));

另一种变体,您可以指示超时切换到可观察到一个备份您指定的,而不是一个错误终止,如果超时条件被触发。


characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });

0
投票

我觉得讨厌的使用,因为异常的Timeout

我更愿意注入一个值,我可以使用终止序列的序列。如果我的序列,例如,生产非负数那么如果我注入-1我知道结束顺序。

下面是一个例子:

与此观察到的开始,其生成以1开始的2的幂,它也延迟了生产由待定值的毫秒数的每个值。

Observable
    .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))

所以1,2,4,8,等等,越来越慢。

现在,我想如果没有值3.0秒钟,然后我可以做到这一点停止序列:

    .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
    .Switch()
    .TakeWhile(x => x >= 0)

如果我运行这个程序,我得到这样的输出:

1 
2 
4 
8 
16 
32 
64 
128 
256 
512 
1024 
2048 

该序列是即将产生4096但它首先等待4096毫秒,以产生值 - 在此同时所述Observable.Timer(TimeSpan.FromSeconds(3.0))火灾,并输出因此-1停止序列。

该查询的关键部分是使用Switch的。它需要一个IObservable<IObservable<T>>并只订阅最新的外部观察到的,从以前的一个退订产生IObservable<T>

所以,在我的查询,通过序列生成的每个新值停止并重新启动Timer

在你的情况下,您观察到的是这样的:

characteristic
    .WhenNotificationReceived()
    .Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
    .Switch()
    .TakeWhile(x => x != null)
    .Subscribe(rfidPlayer);
© www.soinside.com 2019 - 2024. All rights reserved.