将流转换为可观察的

问题描述 投票:0回答:1

我尝试过这个,但似乎存在并发问题。

我不完全明白出了什么问题。

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];

    return Observable
        .FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
        .Repeat()
        .TakeWhile(x => x.bytesRead != 0)
        .Select(x => x.buffer)
        .SelectMany(x => x);
}
c# .net stream observable system.reactive
1个回答
0
投票

您可能的并发问题是,当

.SelectMany(x => x)
在缓冲区上执行时,对
stream.ReadAsync
的调用正在覆盖缓冲区。

您需要确保在

FromAsync
中返回缓冲区的副本。

此版本涵盖了这些问题:

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];
    return
        Observable
            .FromAsync(async ct =>
            {
                var bytes = await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
                return buffer.Take(bytes).ToArray();
            })
            .Repeat()
            .TakeWhile(x => x.Length != 0)
            .SelectMany(x => x);
}

我用这个测试了你的原始代码和我的版本:

var bytes1 =
    await
        Observable
            .Using(
                () => File.OpenRead(fileName),
                s => s.ToObservable())
            .ToArray();

var bytes2 = await File.ReadAllBytesAsync(fileName);

Console.WriteLine(bytes1.SequenceEqual(bytes2));

你的每次都失败,而我的却成功。

© www.soinside.com 2019 - 2024. All rights reserved.