我找到了大量有关如何创建 zip 文件并将其上传到 Azure 存储的信息,但我正在尝试查看是否有一种好方法可以将内容添加到 blob 存储中的现有 zip 文件,而无需下载整个 blob,在内存中重建,然后再次上传。
我的用例涉及将数百个甚至数百万个项目压缩到存档中,以存储在 Azure blob 存储中以供以后下载。我有可以处理分割的代码,这样我就不会得到一个几 GB 大小的文件,但在处理大量文件时我仍然遇到内存管理问题。我试图通过在 blob 存储中创建 zip 文件并将后续文件一一添加到其中来解决此问题。我认识到这会产生额外写入的成本,这很好。
我知道如何使用追加 Blob 和块 Blob,并且我有工作代码来创建 zip 文件并上传,但我似乎不知道是否有办法做到这一点。有人设法做到这一点,或者能够确认这是不可能的吗?
由于您正在处理 zip 文件,因此将新文件添加到现有 zip 文件的唯一方法是下载 blob,将新文件添加到该 zip 文件,然后重新上传该 blob。
发现自己处于类似的情况后,我们发现以下内容确实适合我们,并且没有很大的内存开销。
这不允许更新,但使用已存储在 azure 中的文件的引用,我们可以迭代它们,拉出它们并根据需要从文件名列表中添加到此处 - 然后缓冲并处理条目而无需下载他们同时。您可以减慢整个队列的速度,以防止文件下载速度快于处理速度。
需要解决的一件事是,这就是我使用 ConcurrentQueue 的原因,创建要输入 zip 的可用字节的下载器 - 不能来自管理 ZipEntries 的不同线程/任务 - 所以这是我是如何让它工作的,通过使用下载方法制作一个 SemaphoreSlim,对结果进行排队,然后构建 ZipEntries 的任务 - 从它们中出队,并在它们可用时开始将它们添加到 ZipArchive 中。
这就是我用来创建该队列的。
ConcurrentQueue<KeyValuePair<string, byte[]>> queuedFiles = new ConcurrentQueue<KeyValuePair<string, byte[]>>();
List<Task> tasks = new List<Task>();
foreach (var storedFile in storedFileNames)
{
var t = new Task(() =>
{
try
{
getFileSemaphoreSlim.Wait();
var storedFilePath = Path.Combine(rootDir.ToUpper(), storedFile);
var blob = container.GetBlobClient(storedFilePath);
if (blob.Exists())
{
using (var blobStream = new MemoryStream())
{
blob.DownloadTo(blobStream);
var entryFilename = storedFile;
entryFilename = entryFilename.Replace("rootDir\\", "");
queuedFiles.Enqueue(new KeyValuePair<string, byte[]>(entryFilename, blobStream.ToArray()));
}
}
}
catch (Exception ex)
{
}
finally
{
getFileSemaphoreSlim.Release();
}
});
tasks.Add(t);
t.Start();
}
然后异步任务的下一部分,处理队列并创建 ZipArchive
var blobClient = container.GetBlockBlobClient(archiveName);
using (var stream = new AvoidFlushStream(await blobClient.OpenWriteAsync(true, new BlockBlobOpenWriteOptions(), CancellationToken.None)))
using (var zip = new ZipArchive(stream, ZipArchiveMode.Create, leaveOpen: true))
{
if (csvData != null)
{
var csvDataEntry = zip.CreateEntry("csv.txt");
using (var entryStream = csvDataEntry.Open())
{
await entryStream.WriteAsync(csvData, 0, csvData.Length);
await entryStream.FlushAsync();
totalBytesWritten += csvData.Length;
}
}
while (!tasks.All(x => x.IsCompleted))
{
try
{
if (queuedFiles.Count > 0)
{
while (queuedFiles.Count > 0)
{
try
{
queuedFiles.TryDequeue(out KeyValuePair<string, byte[]> result);
var entry = zip.CreateEntry(result.Key);
using (var entryStream = entry.Open())
{
await entryStream.WriteAsync(result.Value, 0, result.Value.Length);
await entryStream.FlushAsync();
totalBytesWritten += result.Value.Length;
}
}
catch (Exception ex)
{
}
}
}
await Task.Delay(500); //This allows it to wait for more queued entries
}
catch (Exception ex)
{
}
}
await stream.FlushAsync();
}
“AvoidFlushStream”类是从我在另一个人的网站上找到的方法中使用的,我现在找不到参考 - 所以我不声称从他那里拿走任何东西 - 但他概述了,在处理 ZipEntries 时在 AzureBlobStreams 中,每个条目在写入后实际上会刷新自身,这会导致 50,000 个块条目的最大值很快就被最大化..因此客户类阻止解决此问题。
class AvoidFlushStream : Stream
{
private readonly Stream source;
private bool disposed;
public AvoidFlushStream(Stream source)
{
this.source = source;
}
public override bool CanRead => source.CanRead;
public override bool CanSeek => source.CanSeek;
public override bool CanWrite => source.CanWrite;
public override long Length => source.Length;
public override long Position { get => source.Position; set => source.Position = value; }
public override void Flush()
{
//Console.WriteLine("Not gonna flush");
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public override int Read(byte[] buffer, int offset, int count)
{
return source.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin)
{
return source.Seek(offset, origin);
}
public override void SetLength(long value)
{
source.SetLength(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
source.Write(buffer, offset, count);
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return base.WriteAsync(buffer, offset, count, cancellationToken);
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return base.ReadAsync(buffer, offset, count, cancellationToken);
}
protected override void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
source.Dispose();
}
else
{
base.Dispose(false);
}
disposed = true;
}
}
}