我目前有一个Observable<byte[]>
,实际上是一个源文件块,使用this方法。
它将流“chunkify”为byte[]
序列。
问题是,鉴于此序列,我想将其写入目标流。换句话说,我必须将每个byte[]
转储到文件流,直到序列完成,但是,我应该等待,直到序列完成。
到目前为止,我创建的代码是有效的,但我担心这不是正确的方法。处理IObservable<byte[]>
的相关部分是Download
方法。
async Task Main()
{
using (var httpClient = new HttpClient())
{
var downloader = new HttpDownloader(httpClient);
var destinationPath = Path.Combine(Path.GetTempPath(), "test.zip");
await downloader.Download("https://github.com/gus33000/MSM8994-8992-NT-ARM64-Drivers/archive/master.zip", destinationPath);
Console.WriteLine("File downloaded to " + destinationPath);
}
}
public class HttpDownloader
{
private readonly HttpClient client;
public HttpDownloader(HttpClient client)
{
this.client = client;
}
public async Task Download(string url, string path, IDownloadProgress progressObserver = null, int timeout = 30)
{
using (var fileStream = File.OpenWrite(path))
{
await Download(url, fileStream, progressObserver, timeout);
}
}
private async Task Download(string url, Stream destination, IDownloadProgress progressObserver = null,
int timeout = 30)
{
long? totalBytes = 0;
long bytesWritten = 0;
await ObservableMixin.Using(() => client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead),
s =>
{
totalBytes = s.Content.Headers.ContentLength;
if (!totalBytes.HasValue)
{
progressObserver?.Percentage.OnNext(double.PositiveInfinity);
}
return ObservableMixin.Using(() => s.Content.ReadAsStreamAsync(),
contentStream => contentStream.ReadToEndObservable());
})
.Do(bytes =>
{
bytesWritten += bytes.Length;
if (totalBytes.HasValue)
{
progressObserver?.Percentage.OnNext((double)bytesWritten / totalBytes.Value);
}
progressObserver?.BytesDownloaded?.OnNext(bytesWritten);
})
.Timeout(TimeSpan.FromSeconds(timeout))
.Select(bytes => Observable.FromAsync(async () =>
{
await destination.WriteAsync(bytes, 0, bytes.Length);
return Unit.Default;
}))
.Merge(1);
}
private static readonly int BufferSize = 8192;
public async Task<Stream> GetStream(string url, IDownloadProgress progress = null, int timeout = 30)
{
var tmpFile = Path.Combine(Path.GetTempPath(), Path.GetTempFileName());
var stream = File.Create(tmpFile, BufferSize, FileOptions.DeleteOnClose);
await Download(url, stream, progress, timeout);
return stream;
}
}
public interface IDownloadProgress
{
ISubject<double> Percentage { get; set; }
ISubject<long> BytesDownloaded { get; set; }
}
public static class ObservableMixin
{
public static IObservable<TSource> Using<TSource, TResource>(
Func<Task<TResource>> resourceFactoryAsync,
Func<TResource, IObservable<TSource>> observableFactory)
where TResource : IDisposable =>
Observable.FromAsync(resourceFactoryAsync).SelectMany(
resource => Observable.Using(() => resource, observableFactory));
}
public static class StreamExtensions
{
internal const int defaultBufferSize = 4096;
public static IObservable<byte[]> ReadToEndObservable(this Stream stream)
{
return stream.ReadToEndObservable(new byte[defaultBufferSize]);
}
public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
{
return stream.ReadToEndObservable(new byte[bufferSize]);
}
internal static IObservable<byte[]> ReadToEndObservable(this Stream stream, byte[] buffer)
{
return Observable.Create<byte[]>(
observer =>
{
var subscription = new SerialDisposable();
return new CompositeDisposable(
subscription,
Scheduler.Immediate.Schedule(
self =>
{
bool continueReading = true;
subscription.SetDisposableIndirectly(() =>
stream.ReadObservable(buffer).SubscribeSafe(
data =>
{
if (data.Length > 0)
{
observer.OnNext(data);
}
else
{
continueReading = false;
}
},
observer.OnError,
() =>
{
if (continueReading)
{
self();
}
else
{
observer.OnCompleted();
}
}));
}));
});
}
internal static IObservable<byte[]> ReadObservable(this Stream stream, byte[] buffer)
{
return stream.ReadObservable(buffer, 0, buffer.Length).Select(
read =>
{
byte[] data;
if (read <= 0)
{
data = new byte[0];
}
else if (read == buffer.Length)
{
data = (byte[])buffer.Clone();
}
else
{
data = new byte[read];
Array.Copy(buffer, data, read);
}
return data;
});
}
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer, int offset, int count)
{
return Observable.StartAsync(cancel => stream.ReadAsync(buffer, offset, count, cancel));
}
}
public static class SerialDisposableExtensions
{
public static void SetDisposableIndirectly(this SerialDisposable disposable, Func<IDisposable> factory)
{
var indirection = new SingleAssignmentDisposable();
disposable.Disposable = indirection;
indirection.Disposable = factory();
}
}
public static class SafeObservableExtensions
{
public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
Action<Exception> onError, Action onCompleted)
{
return source.SubscribeSafe(Observer.Create<T>(onNext, onError, onCompleted));
}
}
它看起来不错吗?
我最初认为你的ReadToEndObservable
肯定有一个bug,所以我写了这个:
public static IObservable<byte[]> ReadToEndObservable(this Stream stream, int bufferSize)
=>
Observable.Defer<byte[]>(() =>
{
var bytesRead = -1;
var bytes = new byte[bufferSize];
return
Observable.While<byte[]>(
() => bytesRead != 0,
Observable
.FromAsync(() => stream.ReadAsync(bytes, 0, bufferSize))
.Do(x =>
{
bytesRead = x;
})
.Select(x => bytes.Take(x).ToArray()));
});
它似乎仍然没有奏效。
然后我用这个简单的代码尝试了它:
IObservable<byte[]> test1 =
Observable
.Using(
() => File.Open(@"{path}\HttpDownloader-master\HttpDownloader-master\HttpDownloader.sln", FileMode.Open),
s => s.ReadToEndObservable(24));
这与我的代码一起工作。我和你一起尝试过。有效。
我认为您尝试下载的流可能有问题。没有这样的问题 - 只是文件大小为555MB。
我认为你的代码很好,但是尺寸太大而且超时了。