我正在尝试从 AWSS3 存储中读取大量文件。阅读它们等没有问题。但是我时不时地得到两个具有相同文件对象的线程?
我尝试保留一个
List<string>
或一个ConcurrentBag<string>
我已处理的文件,但当两个线程同时命中时 - 两者都认为它们是第一个,但仍然得到相同的重复。
我有什么想法可以解决或防止这种情况吗?
List<S3Object> fileObjects = listFilesResponse.Result.S3Objects;
filecount += listFilesResponse.Result.KeyCount;
ParallelOptions paralleOpts = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(fileObjects, paralleOpts, fileObject =>
{
fileRequest.Key = fileObject.Key;
using (var fileResponse = s3Client.GetObjectAsync(fileRequest))
using (Stream responseStream = fileResponse.Result.ResponseStream)
using (StreamReader reader = new StreamReader(responseStream))
{
//do stuff with file...
}
});
Parallel.ForEach
不返回任何内容。它绝对不适用于异步或 IO 调用,仅适用于数据并行性,即通过分区并使用所有核心来处理分区来处理大量内存数据。
第一个大问题是使用全局变量
fileRequest
来请求all对象。线程将覆盖彼此的更改,并且很容易最终发出相同的请求。
List<T>
也不是线程安全的,因此Parallel.ForEach
线程对其进行的修改将导致并发异常或错误数据。
代码需要一些修复:
Parallel.ForEach
,而使用 Parallel.ForEachAsync
。该方法is适用于异步操作。它仍然没有返回任何结果。async Task<ConcurrentQueue<ResultDTO>> MyDownloaderAsync()
{
var results=new ConcurrentQueue<ResultDTO>();
...
var fileObjects= await client.ListObjectsV2Async(request);
await Parallel.ForEach(fileObjects, async (fileObject,_) =>{
var fileRequest = new GetObjectRequest
{
BucketName = bucketName,
Key = fileObject.Key;
};
using var fileResponse = await s3Client.GetObjectAsync(fileRequest);
using var reader = new StreamReader(fileResponse.ResponseStream);
//do stuff with file...
// Create a `result` object
results.Enqueue(result);
}
return results;
});