附加到 Azure blob zip 文件

问题描述 投票:0回答:2

我找到了大量有关如何创建 zip 文件并将其上传到 Azure 存储的信息,但我正在尝试查看是否有一种好方法可以将内容添加到 blob 存储中的现有 zip 文件,而无需下载整个 blob,在内存中重建,然后再次上传。

我的用例涉及将数百个甚至数百万个项目压缩到存档中,以存储在 Azure blob 存储中以供以后下载。我有可以处理分割的代码,这样我就不会得到一个几 GB 大小的文件,但在处理大量文件时我仍然遇到内存管理问题。我试图通过在 blob 存储中创建 zip 文件并将后续文件一一添加到其中来解决此问题。我认识到这会产生额外写入的成本,这很好。

我知道如何使用追加 Blob 和块 Blob,并且我有工作代码来创建 zip 文件并上传,但我似乎不知道是否有办法做到这一点。有人设法做到这一点,或者能够确认这是不可能的吗?

c# azure zip azure-blob-storage
2个回答
1
投票

由于您正在处理 zip 文件,因此将新文件添加到现有 zip 文件的唯一方法是下载 blob,将新文件添加到该 zip 文件,然后重新上传该 blob。


0
投票

发现自己处于类似的情况后,我们发现以下内容确实适合我们,并且没有很大的内存开销。

这不允许更新,但使用已存储在 azure 中的文件的引用,我们可以迭代它们,拉出它们并根据需要从文件名列表中添加到此处 - 然后缓冲并处理条目而无需下载他们同时。您可以减慢整个队列的速度,以防止文件下载速度快于处理速度。

需要解决的一件事是,这就是我使用 ConcurrentQueue 的原因,创建要输入 zip 的可用字节的下载器 - 不能来自管理 ZipEntries 的不同线程/任务 - 所以这是我是如何让它工作的,通过使用下载方法制作一个 SemaphoreSlim,对结果进行排队,然后构建 ZipEntries 的任务 - 从它们中出队,并在它们可用时开始将它们添加到 ZipArchive 中。

这就是我用来创建该队列的。

ConcurrentQueue<KeyValuePair<string, byte[]>> queuedFiles = new ConcurrentQueue<KeyValuePair<string, byte[]>>();
List<Task> tasks = new List<Task>();
foreach (var storedFile in storedFileNames)
{
    var t = new Task(() =>
    {
        try
        {
            getFileSemaphoreSlim.Wait();

            var storedFilePath = Path.Combine(rootDir.ToUpper(), storedFile);
            var blob = container.GetBlobClient(storedFilePath);

            if (blob.Exists())
            {
                
                using (var blobStream = new MemoryStream())
                {
                    blob.DownloadTo(blobStream);

                    var entryFilename = storedFile;
                    entryFilename = entryFilename.Replace("rootDir\\", "");

                    queuedFiles.Enqueue(new KeyValuePair<string, byte[]>(entryFilename, blobStream.ToArray()));
                }
            }
        }
        catch (Exception ex)
        {

        }
        finally
        {
            getFileSemaphoreSlim.Release();
        }
    });

    tasks.Add(t);
    t.Start();
}

然后异步任务的下一部分,处理队列并创建 ZipArchive

var blobClient = container.GetBlockBlobClient(archiveName);
using (var stream = new AvoidFlushStream(await blobClient.OpenWriteAsync(true, new BlockBlobOpenWriteOptions(), CancellationToken.None)))
using (var zip = new ZipArchive(stream, ZipArchiveMode.Create, leaveOpen: true))
{
    if (csvData != null)
    {
        var csvDataEntry = zip.CreateEntry("csv.txt");
        using (var entryStream = csvDataEntry.Open())
        {
            await entryStream.WriteAsync(csvData, 0, csvData.Length);
            await entryStream.FlushAsync();
            totalBytesWritten += csvData.Length;
        }
    }

    while (!tasks.All(x => x.IsCompleted))
    {
        try
        {
            if (queuedFiles.Count > 0)
            {
                while (queuedFiles.Count > 0)
                {
                    try
                    {
                        queuedFiles.TryDequeue(out KeyValuePair<string, byte[]> result);
                        var entry = zip.CreateEntry(result.Key);
                        using (var entryStream = entry.Open())
                        {
                            await entryStream.WriteAsync(result.Value, 0, result.Value.Length);
                            await entryStream.FlushAsync();
                            totalBytesWritten += result.Value.Length;
                        }
                    }
                    catch (Exception ex)
                    {

                    }
                }
            }

            await Task.Delay(500); //This allows it to wait for more queued entries
        }
        catch (Exception ex)
        {

        }
    }

    await stream.FlushAsync();
}

“AvoidFlushStream”类是从我在另一个人的网站上找到的方法中使用的,我现在找不到参考 - 所以我不声称从他那里拿走任何东西 - 但他概述了,在处理 ZipEntries 时在 AzureBlobStreams 中,每个条目在写入后实际上会刷新自身,这会导致 50,000 个块条目的最大值很快就被最大化..因此客户类阻止解决此问题。

class AvoidFlushStream : Stream
{
    private readonly Stream source;
    private bool disposed;
    public AvoidFlushStream(Stream source)
    {
        this.source = source;
    }

    public override bool CanRead => source.CanRead;

    public override bool CanSeek => source.CanSeek;

    public override bool CanWrite => source.CanWrite;

    public override long Length => source.Length;

    public override long Position { get => source.Position; set => source.Position = value; }

    public override void Flush()
    {
        //Console.WriteLine("Not gonna flush");
    }

    public override Task FlushAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        return source.Read(buffer, offset, count);
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        return source.Seek(offset, origin);
    }

    public override void SetLength(long value)
    {
        source.SetLength(value);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        source.Write(buffer, offset, count);
    }

    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return base.WriteAsync(buffer, offset, count, cancellationToken);
    }

    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return base.ReadAsync(buffer, offset, count, cancellationToken);
    }

    protected override void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                source.Dispose();
            }
            else
            {
                base.Dispose(false);
            }
            disposed = true;
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.