无法使用并行库并行化我的代码

问题描述 投票:0回答:1

我在尝试并行化我的代码时遇到了很多麻烦。基本上,这段代码将两条新记录引入数据库。

如果您输入的记录已存在,并且处于错误状态,则将旧记录发送到历史记录后输入。如果这些记录不处于错误状态,它们将被丢弃,如果这些记录以前不存在,则直接引入它们。

为了并行化此代码,我尝试修复它,为每个线程创建一个 DBContext 和一个 UnitOfWork。但我仍然不断发现同样的错误:

在上一个操作完成之前,已在此上下文实例上启动了第二个操作。这通常是由不同线程同时使用同一个 DbContext 实例引起的。 

我看不到问题,更看不到解决方案,你能帮我吗?

ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 2;

    Parallel.ForEach(excelRawList.SuccessRows, options, (excelRow) =>
    {
        using (var dbContext = new MyContext(
        new DbContextOptionsBuilder<MyContext>()
    .UseSqlServer(_configuration.GetConnectionString("MyDB"))
            .Options))
        {
            using (var unitOfWork = new UnitOfWork(dbContext))
            {
                try
                {
                    int? canBeInserted = await service3.CanBeInserted(excelRow.Id);
    
                    // canBeIserted could be 3 values: null (there is not previous record), 0 (there is a previous record and it is not an error),
                    // Id_value (there is a previous record but it was an error)
    
                    if (canBeInserted != 0)
                    {
                        if (!excelRow.Prorroga)
                        {
                            using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(_timeoutTransaccion)))
                            {
                                unitOfWork.BeginTransaction(cts);
    
                                if (canBeInserted != null)
                                {
                                    service4.MoveToHistoric(unitOfWork, canBeInserted.Value, cts);
                                }
    
                                Entity2 entity2 = service3.Entity1ToEntity2(excelRow);
                                excelRow.entity2 = entity2;
    
                                unitOfWork.Context.Set<Entity2>().Add(entity2);
                                unitOfWork.Context.Set<Entity1>().Add(excelRow);
    
                                unitOfWork.SaveChanges(cts);
                                unitOfWork.CommitTransaction(cts);
    
                                if (entity2.Age == null)
                                {
                                   _logger.LogWarning("Age didnt find it");
                                }
                            }
                        }
                        catch (OperationCanceledException ex)
                        {
                            unitOfWork.Rollback();
                        }
                        catch (Exception ex)
                        {
                            unitOfWork.Rollback();
                        }
                  }
                  else
                  {
                       excelRow.FK2= null;
                       _service1.Add(excelRow);
                  }
             }
             else
             {
                   excelRawList.ErrorRows.Add(new ExcelError() { 
             }
          }
          catch (Exception ex)
          {
              excelRawList.ErrorRows.Add(new ExcelError()
              {
                   Message = ex.InnerException.Message
              });
          }
        }
      }
    });

    public void MoveToHistoric(UnitOfWork unitOfWork, int Id, CancellationTokenSource cts)
    {
        try
        {
            Entity1 entity1 = service1.Get(id);
            Entity2 entity2 = service2.Get(entity1.FK);
            Entity3 entity3 = _mapper.Map<Entity3>(entity1);
            Entity4 entity4 = _mapper.Map<Entity4>(entity2);
    
            unitOfWork.Context.Set<Entity1>().Remove(entity1);
            unitOfWork.Context.Set<Entity2>().Remove(entity2);
    
            await unitOfWork.SaveChangesAsyncLimite(cts);
    
            entity3.Entity4 = entity4;
            unitOfWork.Context.Set<Entity4>().Add(entity4);
            unitOfWork.Context.Set<Entity3>().Add(entity3);
    
            unitOfWork.SaveChanges(cts);
    
            return unitOfWork;
        }
        catch (Exception ex)
        {
            throw;
        }
    }

非常感谢

c# .net parallel-processing .net-8.0 parallel.foreach
1个回答
0
投票

我的猜测是

await service3.CanBeInserted(excelRow.Id)
,或其他服务类别之一,看起来像这样:

public class Service3{
    MyContext dbContext;
    IUnitOfWork unitOfWork;
    public Service3(string connectionString){
       dbContext = new MyContext(new DbContextOptionsBuilder<MyContext>()
              .UseSqlServer(_configuration.GetConnectionString(connectionString))
              .Options);
       unitOfWork = new UnitOfWork(dbContext);
   }
   public int? CanBeInserted(int id){
   ...
   }
}

在并行循环中使用时,这会导致问题,因为它将允许在同一数据库上下文上并行执行多个调用,正如错误消息所述。

我最习惯的方法是在任何服务类中每次调用创建一个工作单元对象,即

public class Service3{
    IUnitOfWorkFactory unitFactory;
    public Service3(IUnitOfWorkFactory unitFactory) => this.unitFactory = unitFactory;
   public int? CanBeInserted(int id){
       using(var unitOfWork = unitFactory.Create()){
           ...
       }
   }
}

其中

IUnitOfWorkFactory.Create
负责创建新的 DbContext 和 UnitOfWork 对象。

另一种可能更适合您的特定用例的方法是将工作单元作为参数:

public class Service3{
   public int? CanBeInserted(UnitOfWork unitOfWork, int id){
       ...
   }
}

这两个选项都应该消除从多个线程同时使用数据库上下文的可能性。

© www.soinside.com 2019 - 2024. All rights reserved.