Rx.NET 缓冲区在报价聚合时存在延迟问题

问题描述 投票:0回答:2

我正在使用反应式库基于符号报价(外汇和差价合约)进行 1 秒柱聚合

到目前为止,IAsyncEnumerable 扩展还没有成功,所以我决定用推送策略替换拉取策略,其中反应式与操作符一起发挥作用。不幸的是我不熟悉它并陷入了另一个问题。不管怎样,我已经非常接近最终结果了。

我想要实现的是根据同一秒的“TickTime”按符号报价进行分组。我面临的问题是报价时间 - 来自经纪商端的 TickTime 和接收时间 - 来自通道的 ReceivedTime 之间的延迟(延迟约为 10 到 30 毫秒)。我尝试将延迟运算符与缓冲区(时间跨度持续时间)结合使用,但我想这不是正确的选择。我知道我应该使用窗口操作符,但我不知道如何使用,因为反应式方法对我来说是新的东西,我仍在学习。

注意事项: 如果 1 秒内没有出现报价,则可观察的应该发布空列表(某种超时),并且在我的情况下不可能进行后续更新。

这是我的短代码

    await Task.Run(() => {
    
    var asset = symbol;

    var observable = streamQuotes.ToObservable()
        .Delay(TimeSpan.FromMilliseconds(15))
        .Buffer(TimeSpan.FromSeconds(1), Scheduler.Default)
        .Subscribe(quotes =>
        {
            var topic = Topic.ToObject($"{provider}-{asset}-{Topic.SECOND}-1");

            if (DebugMode)
                Logger.LogInformation(
                    $"{topic} {JsonConvert.SerializeObject(quotes, Formatting.Indented)}");

            //_ = AggregateAsync(topic, quotes, epochBegin, epochStep++), cancellationToken);
        });

    cancellationToken.Register(() => observable.Dispose());
    Subscriptions.Add(observable);

}, cancellationToken);

产生以下输出

  {
    "Id": 142,
    "Bid": 45282.72,
    "Ask": 45341.63,
    "Symbol": "BTCUSD",
    "Time": 1707431630102,
    "TickTime": "2024-02-08T22:33:50.102+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.1187799+00:00"
  },
  {
    "Id": 142,
    "Bid": 45283.05,
    "Ask": 45341.96,
    "Symbol": "BTCUSD",
    "Time": 1707431630552,
    "TickTime": "2024-02-08T22:33:50.552+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.5654973+00:00"
  },
  {
    "Id": 142,
    "Bid": 45283.86,
    "Ask": 45342.77,
    "Symbol": "BTCUSD",
    "Time": 1707431630591,
    "TickTime": "2024-02-08T22:33:50.591+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.6046997+00:00"
  },
  {
    "Id": 142,
    "Bid": 45285.07,
    "Ask": 45343.98,
    "Symbol": "BTCUSD",
    "Time": 1707431630702,
    "TickTime": "2024-02-08T22:33:50.702+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.7155601+00:00"
  },
  {
   "Id": 142,
   "Bid": 45284.73,
   "Ask": 45343.64,
   "Symbol": "BTCUSD",
   "Time": 1707431630753,
   "TickTime": "2024-02-08T22:33:50.753+00:00",
   "ReceivedTime": "2024-02-08T22:33:50.7658716+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.45,
    "Ask": 45343.36,
    "Symbol": "BTCUSD",
    "Time": 1707431630853,
    "TickTime": "2024-02-08T22:33:50.853+00:00",
    "ReceivedTime": "2024-02-08T22:33:50.866821+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.54,
    "Ask": 45343.46,
    "Symbol": "BTCUSD",
    "Time": 1707431631003,
    "TickTime": "2024-02-08T22:33:50.999+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.005728+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.64,
    "Ask": 45343.55,
    "Symbol": "BTCUSD",
    "Time": 1707431631040,
    "TickTime": "2024-02-08T22:33:51.03+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.0530009+00:00"
  },
  {
    "Id": 142,
    "Bid": 45284.64,
    "Ask": 45343.55,
    "Symbol": "BTCUSD",
    "Time": 1707431631040,
    "TickTime": "2024-02-08T22:33:51.05+00:00",
    "ReceivedTime": "2024-02-08T22:33:51.056112+00:00"
  }

输出错误,因为列表中的最后 2 个对象属于下一秒。

请帮助我解决这个问题,或者至少为我指出正确的方向。

window buffer system.reactive
2个回答
0
投票

由于延迟和缓冲区依赖于平均精度为 15 毫秒的系统计时器,因此您的缓冲将始终在每秒之间滑动。

您需要依赖绝对事件时间。请看下面我的示例,它不依赖于计时器,而仅依赖于绝对时间。

static async Task Main()
{
    var rnd = new Random();

    var quotes = Observable
                .Interval(TimeSpan.FromMilliseconds(300))
                .Select(_ => rnd.NextDouble());

    var sub = quotes
                .Timestamp()
                .Do(x => Console.WriteLine($"{x.Timestamp:mm:ss.fff}: {x.Value:N}"))
                .GroupByUntil(
                    q => q.Timestamp.DateTime.AddSeconds(1).ToString("s"), 
                    g => Observable.Return(Unit.Default).DelaySubscription(DateTime.Parse(g.Key, null, DateTimeStyles.AssumeUniversal)))
                .Select(group => group.ToList())
                .Concat()
                .Subscribe(list =>
                {
                    if(!list.Any()) return;

                    Console.WriteLine($"{list.First().Timestamp:HH:mm:ss.fff}-{list.Last().Timestamp:ss:fff}: Count: {list.Count}, Open: {list.First().Value:N}, High: {list.Max(x=>x.Value):N}, Low: {list.Min(x=>x.Value):N}, Close: {list.Last().Value:N}");
                });

    Console.ReadLine();     
    sub.Dispose();              
}

如有需要解释,请随时询问。


0
投票

这可能应该为您解决问题,或者至少引导您走上正确的道路:

var observable = streamQuotes.ToObservable()
    .GroupBy(q => q.TickTime.Ticks / TimeSpan.TicksPerSecond) 
    .SelectMany(g => g.Buffer(TimeSpan.FromSeconds(1), Scheduler.Default))
    .Merge()
    .Subscribe(quotes =>
    {
        Console.Write(JsonConvert.SerializeObject(quotes, Newtonsoft.Json.Formatting.Indented));

    });

您的代码存在一些问题:

  1. 看起来您想按 TickTime 属性的秒
    value
    进行缓冲/分组。您调用的
    Buffer
    运算符(以及
    Window
    运算符)将在代码运行和数据到达时按秒偏移量进行缓冲。它根本不查看您的数据。
  2. 如果你对窗口开口进行胡闹,你可以使用缓冲区,但这比它的价值更麻烦。

解决方案中的

GroupBy
首先按基于刻度的第二个值对数据进行分组。
Buffer
然后将它们形成一个列表。

© www.soinside.com 2019 - 2024. All rights reserved.