在过去的几个月里,我一直在做一个出口项目,基本上应用程序需要从 blob 存储中获取文件,将文件统一在一个文件中,压缩成一个 zip 并上传到 blob 存储中。我把这个过程分成几步。性能非常好,整个过程都在运行,但是当我导出很多文件时,最后一步崩溃了(因为我的环境只有 15gb 的内存,而且文件比这更大)。有什么想法吗?
最后一步和代码的一些描述:
public async Task<Dictionary<string, byte[]>> DownloadManyAsync(Guid exportId)
{
var tasks = new Queue<Task>();
var files = new ConcurrentDictionary<string, byte[]>();
var container = _blobServiceClient.GetBlobContainerClient("");
var blobs = container.GetBlobs(prefix: "");
var options = BlobStorageTools.GetOptions();
foreach (var blob in blobs)
{
tasks.Enqueue(DownloadAndEnlist(container.GetBlobClient(blob.Name), files, options, exportId));
}
await Task.WhenAll(tasks);
return files.ToDictionary(x => x.Key,
x => x.Value,
files.Comparer);
}
public async Task DownloadAndEnlist(BlobClient blob, ConcurrentDictionary<string, byte[]> files, StorageTransferOptions options, Guid exportId)
{
using var memoryStream = new MemoryStream();
await blob.DownloadToAsync(memoryStream, default, options);
files.TryAdd(blob.Name, memoryStream.ToArray());
}
using var memoryStream = new MemoryTributary();
using (var archive = new ZipArchive(memoryStream, ZipArchiveMode.Create, true))
{
for (int i = files.Count - 1; i >= 0; i--)
{
var file = files.ElementAt(i);
var zipArchiveEntry = archive.CreateEntry(file.Key, CompressionLevel.Fastest);
using var zipStream = zipArchiveEntry.Open();
zipStream.Write(file.Value, 0, file.Value.Length);
files.Remove(file.Key);
}
}
public async Task<string> SaveExport(string fileName, Stream file)
{
var cloudBlockBlob = _blobClient.GetContainerReference("").GetBlockBlobReference($"{fileName}.zip");
BlockingCollection<string> blockList = new();
Queue<Task> tasks = new();
int bytesRead;
int blockNumber = 0;
if (file.Position != 0) file.Position = 0;
do
{
blockNumber++;
string blockId = $"{blockNumber:000000000}";
string base64BlockId = Convert.ToBase64String(Encoding.UTF8.GetBytes(blockId));
byte[] buffer = new byte[8000000];
bytesRead = await file.ReadAsync(buffer);
tasks.Enqueue(Task.Run(async () =>
{
await cloudBlockBlob.PutBlockAsync(base64BlockId, new MemoryStream(buffer, 0, bytesRead) { Position = 0 }, null);
blockList.Add(base64BlockId);
}));
} while (bytesRead == 8000000);
await Task.WhenAll(tasks);
await cloudBlockBlob.PutBlockListAsync(blockList);
return cloudBlockBlob.Uri.ToString();
}
我想使用 az 函数,但是函数有 15gb 内存限制,我也会有同样的问题。
如我所见,您使用了不在内存中的流,例如在磁盘中创建 de zip 文件。但我猜你不想那样做,所以你可以直接在 blob 中创建你的 zip 文件。
有点像
using (var blobStream = await blob.OpenWriteAsync())
using (var archive = new ZipArchive(blobStream, ZipArchiveMode.Create, true))
{
}
换个话题。您可能希望使用 RecyclableMemoryStreamManager 来获取代码中的内存流,以便您可以重新使用之前分配的流。
使用起来非常简单,只需创建一个 RecyclableMemoryStreamManager 的实例,然后从中获取流。
private static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager();
using (var ms= manager.GetStream())
https://github.com/microsoft/Microsoft.IO.RecyclableMemoryStream
(编辑)
哦!我没有注意到你在做压缩之前下载了所有的流。这意味着您需要同时存储所有这些。
您可能需要下载一些,然后压缩,然后重复。
或者可以使用 ConcurrentQueue,并有一个进程下载流并将其放入队列,另一个进程获取流并压缩它。为此,您需要某种标志,上面写着“没有其他可下载,清空队列后,您就完成了”。
想法是释放已经压缩的 blob 的内存。 如果这样做,应用 ReciclableMemoryStream,并直接写入 blob(或本地磁盘),您将大大减少进程使用的内存量。而且它也会提高速度。