倾倒可观察 到一个流

问题描述 投票:-1回答:1

我目前有一个Observable<byte[]>,实际上是一个源文件块,使用this方法。

它将流“chunkify”为byte[]序列。

问题是,鉴于此序列,我想将其写入目标流。换句话说,我必须将每个byte[]转储到文件流,直到序列完成,但是,我应该等待,直到序列完成。

到目前为止,我创建的代码是有效的,但我担心这不是正确的方法。处理IObservable<byte[]>的相关部分是Download方法。

async Task Main()
{
    using (var httpClient = new HttpClient()) 
    {
        var downloader = new HttpDownloader(httpClient);
        var destinationPath = Path.Combine(Path.GetTempPath(), "test.zip");
        await downloader.Download("https://github.com/gus33000/MSM8994-8992-NT-ARM64-Drivers/archive/master.zip", destinationPath);
        Console.WriteLine("File downloaded to " + destinationPath);
    }   
}

public class HttpDownloader
{
    private readonly HttpClient client;

    public HttpDownloader(HttpClient client)
    {
        this.client = client;
    }

    public async Task Download(string url, string path, IDownloadProgress progressObserver = null, int timeout = 30)
    {
        using (var fileStream = File.OpenWrite(path))
        {
            await Download(url, fileStream, progressObserver, timeout);
        }
    }

    private async Task Download(string url, Stream destination, IDownloadProgress progressObserver = null,
        int timeout = 30)
    {
        long? totalBytes = 0;
        long bytesWritten = 0;

        await ObservableMixin.Using(() => client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead),
                s =>
                {
                    totalBytes = s.Content.Headers.ContentLength;
                    if (!totalBytes.HasValue)
                    {
                        progressObserver?.Percentage.OnNext(double.PositiveInfinity);
                    }
                    return ObservableMixin.Using(() => s.Content.ReadAsStreamAsync(),
                        contentStream => contentStream.ReadToEndObservable());
                })
            .Do(bytes =>
            {
                bytesWritten += bytes.Length;
                if (totalBytes.HasValue)
                {
                    progressObserver?.Percentage.OnNext((double)bytesWritten / totalBytes.Value);
                }

                progressObserver?.BytesDownloaded?.OnNext(bytesWritten);
            })
            .Timeout(TimeSpan.FromSeconds(timeout))
            .Select(bytes => Observable.FromAsync(async () =>
            {
                await destination.WriteAsync(bytes, 0, bytes.Length);
                return Unit.Default;
            }))
            .Merge(1);
    }

    private static readonly int BufferSize = 8192;

    public async Task<Stream> GetStream(string url, IDownloadProgress progress = null, int timeout = 30)
    {
        var tmpFile = Path.Combine(Path.GetTempPath(), Path.GetTempFileName());
        var stream = File.Create(tmpFile, BufferSize, FileOptions.DeleteOnClose);

        await Download(url, stream, progress, timeout);
        return stream;
    }
}

public interface IDownloadProgress
{
    ISubject<double> Percentage { get; set; }
    ISubject<long> BytesDownloaded { get; set; }
}

public static class ObservableMixin
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}

public static class StreamExtensions
{
    internal const int defaultBufferSize = 4096;

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream)
    {
        return stream.ReadToEndObservable(new byte[defaultBufferSize]);
    }

    public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    {
        return stream.ReadToEndObservable(new byte[bufferSize]);
    }

    internal static IObservable<byte[]> ReadToEndObservable(this Stream stream, byte[] buffer)
    {
        return Observable.Create<byte[]>(
            observer =>
            {
                var subscription = new SerialDisposable();

                return new CompositeDisposable(
                    subscription,
                    Scheduler.Immediate.Schedule(
                        self =>
                        {
                            bool continueReading = true;

                            subscription.SetDisposableIndirectly(() =>
                                stream.ReadObservable(buffer).SubscribeSafe(
                                    data =>
                                    {
                                        if (data.Length > 0)
                                        {
                                            observer.OnNext(data);
                                        }
                                        else
                                        {
                                            continueReading = false;
                                        }
                                    },
                                    observer.OnError,
                                    () =>
                                    {
                                        if (continueReading)
                                        {
                                            self();
                                        }
                                        else
                                        {
                                            observer.OnCompleted();
                                        }
                                    }));
                        }));
            });
    }

    internal static IObservable<byte[]> ReadObservable(this Stream stream, byte[] buffer)
    {
        return stream.ReadObservable(buffer, 0, buffer.Length).Select(
            read =>
            {
                byte[] data;

                if (read <= 0)
                {
                    data = new byte[0];
                }
                else if (read == buffer.Length)
                {
                    data = (byte[])buffer.Clone();
                }
                else
                {
                    data = new byte[read];

                    Array.Copy(buffer, data, read);
                }

                return data;
            });
    }

    public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer, int offset, int count)
    {
        return Observable.StartAsync(cancel => stream.ReadAsync(buffer, offset, count, cancel));
    }   
}

public static class SerialDisposableExtensions
{
    public static void SetDisposableIndirectly(this SerialDisposable disposable, Func<IDisposable> factory)
    {
        var indirection = new SingleAssignmentDisposable();

        disposable.Disposable = indirection;

        indirection.Disposable = factory();
    }
}


public static class SafeObservableExtensions
{
    public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
        Action<Exception> onError, Action onCompleted)
    {
        return source.SubscribeSafe(Observer.Create<T>(onNext, onError, onCompleted));
    }
}

它看起来不错吗?

c# .net stream system.reactive
1个回答
1
投票

我最初认为你的ReadToEndObservable肯定有一个bug,所以我写了这个:

public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
    =>
        Observable.Defer<byte[]>(() =>
        {
            var bytesRead = -1;
            var bytes = new byte[bufferSize];
            return
                Observable.While<byte[]>(
                    () => bytesRead != 0,
                    Observable
                        .FromAsync(() => stream.ReadAsync(bytes, 0, bufferSize))
                        .Do(x =>
                        {
                            bytesRead = x;
                        })
                        .Select(x => bytes.Take(x).ToArray()));
        });

它似乎仍然没有奏效。

然后我用这个简单的代码尝试了它:

IObservable<byte[]> test1 =
    Observable
        .Using(
            () => File.Open(@"{path}\HttpDownloader-master\HttpDownloader-master\HttpDownloader.sln", FileMode.Open),
            s => s.ReadToEndObservable(24));

这与我的代码一起工作。我和你一起尝试过。有效。

我认为您尝试下载的流可能有问题。没有这样的问题 - 只是文件大小为555MB。

我认为你的代码很好,但是尺寸太大而且超时了。

© www.soinside.com 2019 - 2024. All rights reserved.