我正在尝试从要与它们发生的情况集成的服务器中处理long-lived HTTP connection的结果。该服务器为每个我要处理的“事件”返回一行JSON(以\n
分隔)。
给出分配给变量Stream
的changeStream
实例,该实例表示来自打开的HTTP连接的字节,这是我正在做的事情的摘录示例:
([request
是WebRequest
的实例,配置为打开与我要集成的服务器的连接。)
var response = request.GetResponse();
var changeStream = response.GetResponseStream();
var lineByLine = Observable.Using(
() => new StreamReader(changeStream),
(streamReader) => streamReader.ReadLineAsync().ToObservable()
);
lineByLine.Subscribe((string line) =>
{
System.Console.WriteLine($"JSON! ---------=> {line}");
});
使用上面的代码,最终发生的事情是我收到服务器发送的第一行,但此后没有。既不是初始响应,也不是实时活动生成的新响应。
请注意:此方案不是切换到SignalR的候选方案
您的尝试只会看到一个ReadLineAsync
呼叫。相反,您需要返回每一行。大概是这样;
Observable.Create<string>(async o => {
var response = await request.GetResponseAsync();
var changeStream = response.GetResponseStream();
using var streamReader = new StreamReader(changeStream);
while (true)
{
var line = await streamReader.ReadLineAsync();
if (line == null)
break;
o.OnNext(line);
}
o.OnCompleted();
});
尽管看起来更复杂,但最好使用内置运算符来完成这项工作。
IObservable<string> query =
Observable
.FromAsync(() => request.GetResponseAsync())
.SelectMany(
r => Observable.Using(
() => r,
r2 => Observable.Using(
() => r2.GetResponseStream(),
rs => Observable.Using(
() => new StreamReader(rs),
sr =>
Observable
.Defer(() => Observable.Start(() => sr.ReadLine()))
.Repeat()
.TakeWhile(w => w != null)))));
未经测试,但应该靠近。