我正在尝试尽快将大量 blob 文件上传到 Azure 存储。我正在使用这段代码尝试以 50 个为一组进行多线程上传,以便更快地上传:
//ASYNC
var semaphore = new SemaphoreSlim(50);
var tasks = new List<Task>();
await semaphore.WaitAsync();
foreach (var callObj in source.Skip(curBatch * batchSize).Take(batchSize))
{
tasks.Add(Task.Run(async () =>
{
try
{
var name = ...;
blob = blobContainerForCalls.GetBlockBlobReference(name);
await blob.UploadTextAsync(JsonConvert.SerializeObject(data));
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
过去几天一直运行良好。然而,从今天开始,这些上传开始生成此错误:
The MD5 value specified in the request did not match with the MD5 value calculated by the server.
如果我将其更改为仅同步上传每个,则效果很好:
//SYNC
foreach (var callObj in source.Skip(curBatch * batchSize).Take(batchSize))
{
var name = ...;
blob = blobContainerForCalls.GetBlockBlobReference(name);
await blob.UploadTextAsync(JsonConvert.SerializeObject(data));
}
我的
ASYNC
代码是否有问题会导致这些 MD5 错误?
我认为您代码中的信号量没有达到其预期目的,因为您的
await semaphore.WaitAsync()
位于循环之外。
您应该等待循环迭代获取信号量,然后再添加新任务。
另一个潜在问题是
blob
。目前尚不清楚您在哪里声明了这一点。在不知情的情况下,我猜想单独的线程可能会互相干扰。
尝试进行以下更新:
var semaphore = new SemaphoreSlim(50);
var tasks = new List<Task>();
foreach (var callObj in source.Skip(curBatch * batchSize).Take(batchSize))
{
await semaphore.WaitAsync(); // Acquire semaphore before starting each task
tasks.Add(Task.Run(async () =>
{
// Create local instance of CloudBlockBlob
CloudBlockBlob blob;
try
{
var name = ...;
blob = blobContainerForCalls.GetBlockBlobReference(name);
await blob.UploadTextAsync(JsonConvert.SerializeObject(data));
}
finally
{
if (blob != null)
blob.Dispose(); // Dispose of the CloudBlockBlob
semaphore.Release(); // Release semaphore after task completes
}
}));
}
await Task.WhenAll(tasks);
如果问题仍然存在,或者似乎是间歇性的,请尝试减少信号量容量。 50 个并发线程可能会导致某种限制,具体取决于运行代码的机器的规格。