Parallel.ForEach 返回同一文件对象两次

问题描述 投票:0回答:1

我正在尝试从 AWSS3 存储中读取大量文件。阅读它们等没有问题。但是我时不时地得到两个具有相同文件对象的线程?

我尝试保留一个

List<string>
或一个
ConcurrentBag<string>
我已处理的文件,但当两个线程同时命中时 - 两者都认为它们是第一个,但仍然得到相同的重复。

我有什么想法可以解决或防止这种情况吗?

List<S3Object> fileObjects = listFilesResponse.Result.S3Objects;
filecount += listFilesResponse.Result.KeyCount;

ParallelOptions paralleOpts = new ParallelOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(fileObjects, paralleOpts, fileObject =>
{
    fileRequest.Key = fileObject.Key;

    using (var fileResponse = s3Client.GetObjectAsync(fileRequest))
    using (Stream responseStream = fileResponse.Result.ResponseStream)
    using (StreamReader reader = new StreamReader(responseStream))
    {
        //do stuff with file...
    }
});
c# multithreading parallel-processing parallel.foreach
1个回答
0
投票

Parallel.ForEach
不返回任何内容。它绝对不适用于异步或 IO 调用,仅适用于数据并行性,即通过分区并使用所有核心来处理分区来处理大量内存数据。

第一个大问题是使用全局变量

fileRequest
来请求all对象。线程将覆盖彼此的更改,并且很容易最终发出相同的请求。

List<T>
也不是线程安全的,因此
Parallel.ForEach
线程对其进行的修改将导致并发异常或错误数据。

代码需要一些修复:

  • 每次都应该创建一个新的请求对象
  • AWS 方法是异步的,因此该方法也应该是异步的。阻塞异步方法只会导致问题。
  • 不要使用
    Parallel.ForEach
    ,而使用
    Parallel.ForEachAsync
    。该方法is适用于异步操作。它仍然没有返回任何结果。
  • 将数据存储在并发集合中,例如 ConcurrentQueue<> 或 ConcurrentDictionary<>。 ConcurrentBag 是一个专门的集合,用于线程本地存储。
async Task<ConcurrentQueue<ResultDTO>> MyDownloaderAsync()
{
    var results=new ConcurrentQueue<ResultDTO>();
    ...
    var fileObjects= await client.ListObjectsV2Async(request);

    await Parallel.ForEach(fileObjects, async (fileObject,_) =>{
        var fileRequest = new GetObjectRequest
        {
            BucketName = bucketName,
            Key = fileObject.Key;
        };

        using var fileResponse = await s3Client.GetObjectAsync(fileRequest);
        using var reader = new StreamReader(fileResponse.ResponseStream);
        //do stuff with file...
        // Create a `result` object
        results.Enqueue(result);
    }
    return results;
});
© www.soinside.com 2019 - 2024. All rights reserved.