我如何从C#中长期存在的HTTP连接接收实时数据?

问题描述 投票:0回答:2

我正在尝试从要与它们发生的情况集成的服务器中处理long-lived HTTP connection的结果。该服务器为每个我要处理的“事件”返回一行JSON(以\n分隔)。

给出分配给变量StreamchangeStream实例,该实例表示来自打开的HTTP连接的字节,这是我正在做的事情的摘录示例:

([requestWebRequest的实例,配置为打开与我要集成的服务器的连接。)

var response = request.GetResponse();
var changeStream = response.GetResponseStream();

var lineByLine = Observable.Using(
    () => new StreamReader(changeStream),
    (streamReader) => streamReader.ReadLineAsync().ToObservable()
);

lineByLine.Subscribe((string line) =>
{
    System.Console.WriteLine($"JSON! ---------=> {line}");
});

使用上面的代码,最终发生的事情是我收到服务器发送的第一行,但此后没有。既不是初始响应,也不是实时活动生成的新响应。

  • 出于我的问题,假定此连接将无限期保持打开状态。
  • 当遇到它们时,如何使system.reactive触发每行的回调?

请注意:此方案不是切换到SignalR的候选方案

c# .net system.reactive http-streaming
2个回答
1
投票

您的尝试只会看到一个ReadLineAsync呼叫。相反,您需要返回每一行。大概是这样;

Observable.Create<string>(async o => {
    var response = await request.GetResponseAsync();
    var changeStream = response.GetResponseStream();
    using var streamReader = new StreamReader(changeStream);
    while (true)
    {
        var line = await streamReader.ReadLineAsync();
        if (line == null)
            break;
        o.OnNext(line);
    }
    o.OnCompleted();
});

0
投票

尽管看起来更复杂,但最好使用内置运算符来完成这项工作。

IObservable<string> query =
    Observable
        .FromAsync(() => request.GetResponseAsync())
        .SelectMany(
            r => Observable.Using(
                () => r,
                r2 => Observable.Using(
                    () => r2.GetResponseStream(),
                    rs => Observable.Using(
                        () => new StreamReader(rs),
                        sr =>
                            Observable
                                .Defer(() => Observable.Start(() => sr.ReadLine()))
                                .Repeat()
                                .TakeWhile(w => w != null)))));

未经测试,但应该靠近。

© www.soinside.com 2019 - 2024. All rights reserved.