我尝试过这个,但似乎存在并发问题。
我不完全明白出了什么问题。
public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
var buffer = new byte[bufferSize];
return Observable
.FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
.Repeat()
.TakeWhile(x => x.bytesRead != 0)
.Select(x => x.buffer)
.SelectMany(x => x);
}
您可能的并发问题是,当
.SelectMany(x => x)
在缓冲区上执行时,对 stream.ReadAsync
的调用正在覆盖缓冲区。
您需要确保在
FromAsync
中返回缓冲区的副本。
此版本涵盖了这些问题:
public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
var buffer = new byte[bufferSize];
return
Observable
.FromAsync(async ct =>
{
var bytes = await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
return buffer.Take(bytes).ToArray();
})
.Repeat()
.TakeWhile(x => x.Length != 0)
.SelectMany(x => x);
}
我用这个测试了你的原始代码和我的版本:
var bytes1 =
await
Observable
.Using(
() => File.OpenRead(fileName),
s => s.ToObservable())
.ToArray();
var bytes2 = await File.ReadAllBytesAsync(fileName);
Console.WriteLine(bytes1.SequenceEqual(bytes2));
你的每次都失败,而我的却成功。