我有一些问题,关于如何 SaveChangesAsync()
和 BeginTransaction()
+ transaction.Commit()
工作。
我的团队有一个.NET Core worker,它从Microsoft EventHub接收事件,并通过EF Core 3将数据保存到SQL服务器中。其中一个事件类型有很多数据,所以我们创建了几个表,将数据分开,然后保存到这些表中。子表引用父表的 id
列(FK_Key)。在某些条件下,DB中的一些数据必须在保存新数据之前被删除,所以我们删除->upsert数据。
为了将数据保存到DB中,我们调用 dbContext.Database.BeginTransaction()
和 transaction.Commit()
. 当我们运行worker的时候,我们会出现死锁异常,比如说 Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
我发现,其中一个 .BatchDeleteAsync()
在 PurgeDataInChildTables()
或其一 BulkInsertOrUpdateAsync()
在 Upsert()
抛出一个死锁异常(每次运行worker时都会发生变化)。
下面是代码。
public async Task DeleteAndUpsert(List<MyEntity> entitiesToDelete, List<MyEntity> entitiesToUpsert)
{
if (entitiesToDelete.Any())
await myRepository.Delete(entitiesToDelete);
if (entitiesToUpsert.Any())
await myRepository.Upsert(entitiesToUpsert);
}
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child1>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child2>(x => x.Id).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child3>(x => x.Id).ToList());
transaction.Commit();
}
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
using (var transaction = dbContext.Database.BeginTransaction())
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
transaction.Commit();
}
}
}
private async Task PurgeDataInChildTables(IList<MyEntity> entities, MyDbContext dbContext)
{
var ids = entities.Select(x => x.Id).ToList();
await dbContext.Child1.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child2.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
await dbContext.Child3.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
}
当worker启动时,它创建了4个线程 它们都会向同一个表上传数据(也会删除)。所以,我认为当一个线程启动一个事务,另一个线程启动另一个事务(或类似的东西...),然后尝试向上转发到(或从)子表中删除时,就会发生死锁。我尝试了一些东西来解决这个问题,发现当我删除了 BeginTransaction()
并使用 SaveChangesAsync()
代替。
这是修改后的代码。
public override async Task Upsert(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkInsertOrUpdateAsync(entities);
// tables that depends on the parent table (FK_Key)
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child1).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child2).ToList());
await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child3).ToList());
await dbContext.SaveChangesAsync();
}
}
public override async Task Delete(IList<MyEntity> entities)
{
using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
{
await PurgeDataInChildTables(entities, dbContext);
await dbContext.BulkDeleteAsync(entities);
await dbContext.SaveChangesAsync();
}
}
死锁在worker启动后30秒内发生,但在我修改代码后2~3分钟内没有发生,所以我认为问题已经解决了,我想如果我延长worker的运行时间可能还会发生。
最后,我的问题是这样的。
BeginTransaction()
+ .Commit()
会发生死锁,但当我使用 SaveChangesAsync()
. 为什么会这样?不查数据库的profiling session,很难说得准确。那里需要查的是采取了什么样的锁(哪里是 shared
以及它在哪里 exclusive
或 update
)而当交易是 其实 打开。我将描述一个理论上的行为,需要用实际的数据库剖析来证明。
当你用Database.BeginTransaction()将一切包裹起来的时候: 隔离级别不是由EF设置的,它使用数据库默认的隔离级别。如果是 Microsoft SQL Server
它将 Read committed
. 这个隔离级别说的是,并发事务可以读取数据,但如果有正在进行的修改,其他事务将等待它完成,即使他们想只是读取。事务将被保持在 Commit()
被调用。
当你没有明确指定事务时: 选择发言和 SaveChangesAsync
将导致单独的事务,其隔离级别默认为数据库。事务的持有时间不会超过它所需要的时间:例如,在的情况下。SaveChangesAsync
当所有变化被写入时,它将在那里,从方法被调用时开始。
事务(进程ID 71)在锁资源上与另一个进程发生了死锁,并被选为死锁受害者。重新运行该事务。
当有几个事务试图获得对某个资源的访问权,并且其中一个事务试图读取数据,另一个事务试图修改时,就会出现这个消息。在这种情况下,为了避免死锁,数据库会尝试杀死一个需要较少资源量的事务来回滚。在你的情况下--这是一个试图读取的事务。读取在回滚方面是轻量级的。
总结一下。 当你有一个巨大的锁,长时间地锁住一个资源的时候, 它就会阻止其他工作者访问这个资源,因为当其他工作者试图读取的时候,数据库就会杀死他们的事务。var ids = entities.Select(x => x.Id).ToList();
观点。当你重写代码时,你摆脱了长锁。更多的是,我可以从文档中看到,以 BulkInsert或UpdateAsync。,这个扩展在每次调用时使用内部事务,不影响也不涉及EF上下文。如果是这样,那么就意味着实际的事务活甚至比一次调用更少。SaveChangesAsync
当数据改变时,不是用扩展,而是用通常的EF方式。