我间歇性地收到
InvalidOperationException
“打开与此命令关联的 DataReader,必须先关闭该命令”
这是一段太长的关键任务代码,它是一个存储过程,可能仍然应该是,但几年前我将其转换为 Dapper 调用,并使用事务在其中一部分失败时放弃全部代码。
我已经看到了几个像here和here这样的SO问题,它们是用
ToList()
解决的,但是,我在查询结果上使用AsList()
,它只调用对Azure表存储的记录器调用和for在参数列表上循环,所以不认为这些应用在这里。
我还使用 EF6 DBContext 连接,而不是创建标准 SqlConnection,这可能只是解决此问题,但不希望在不首先了解问题的情况下进行更改。
此方法每天运行数百次,每天都不会失败,但当它抛出此异常时,客户将被锁定,无法在至少 15 分钟内重新运行此方法。
public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
var conn = _context.Database.Connection;
conn.Open();
using (var transaction = _context.Database.BeginTransaction())
{
try
{
var checks = await conn.QueryMultipleAsync(@"
SELECT Id FROM Customer
WHERE Id = @customerId
SELECT Id FROM Job WHERE Id = @jobId
", new { customerId, jobId }, transaction.UnderlyingTransaction);
var customer = (await checks.ReadAsync<int?>()).SingleOrDefault();
if (customer == null)
{
throw new ServiceNotFoundException($"Customer {customerId} not found");
}
var job = (await checks.ReadAsync<JobDto>()).SingleOrDefault();
if (job == null)
{
throw new ServiceNotFoundException($"Job {jobId} not found");
}
var workItemId = (await conn.QueryAsync<int>(@"
DECLARE @WorkItemId int
INSERT WorkItem (CustomerId, JobId, JobDate)
VALUES (@customerId, @job, GETDATE())
SELECT @WorkItemId = SCOPE_IDENTITY()
IF @CustomerId > 0
UPDATE Customer SET LatestActionDate = GETDATE() WHERE Id = @CustomerId
SELECT @WorkItemId
", new { customerId, jobId }, transaction.UnderlyingTransaction)).SingleOrDefault();
await _auditLogger.LogJob(jobId, "Job created", user);
for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
{
var taskId = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost,) VALUES (@jobId, @Cost)
SELECT Id FROM Task WHERE Id = CAST(SCOPE_IDENTITY() as int)
", new { jobId, job.Cost }, transaction.UnderlyingTransaction));
}
await conn.ExecuteAsync(@"
//do another insert here
", new { jobId }, transaction.UnderlyingTransaction);
var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required
}).ToDataTable();
var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
--Complex cte insert that returns some details to log
", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT")
}, transaction.UnderlyingTransaction)).AsList();
foreach(var task in allocatedTasks)
{
await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with `enter code here`{task.ItemCount} items");
}
transaction.Commit();
return workItemId;
}
catch
{
transaction.Rollback();
throw;
}
}
}
您在
using
上缺少 checks
,它为读者提供了参考。这必须使用大括号才能放置在正确的位置。
更多问题:
QueryAsync
位中使用 QueryFirstAsync
而不是 taskId
。OUTPUT
而不是单独的 SELECT
。catch
和 Rollback
,using
会做到这一点。using
,您应该创建一个新连接,而不是使用 EF Core 中的连接。using
语句,以及 ?? throw
用于空检查。AsList
,它仅检查 List<T>
,不会转换且不会处置。public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
await using var conn = CreateNewConnection();
await conn.OpenAsync();
await using var transaction = conn.BeginTransactionAsync();
int customer, JobDto job;
await (using var checks = await conn.QueryMultipleAsync(@"
SELECT Id FROM Customer
WHERE Id = @customerId;
SELECT Id FROM Job WHERE Id = @jobId;
", new { customerId, jobId }, transaction))
{
customer = (await checks.ReadAsync<int?>()).SingleOrDefault()
?? throw new ServiceNotFoundException($"Customer {customerId} not found");
job = (await checks.ReadAsync<JobDto>()).SingleOrDefault()
?? throw new ServiceNotFoundException($"Job {jobId} not found");
}
var workItemId = (await conn.QueryAsync<int>(@"
INSERT WorkItem (CustomerId, JobId, JobDate)
OUTPUT inserted.Id
VALUES (@customerId, @job, GETDATE());
IF @CustomerId > 0
UPDATE Customer
SET LatestActionDate = GETDATE()
WHERE Id = @CustomerId;
", new { customerId, jobId }, transaction)).SingleOrDefault();
await _auditLogger.LogJob(jobId, "Job created", user);
for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
{
var taskId = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost)
OUTPUT inserted.Id
VALUES (@jobId, @Cost);
", new { jobId, job.Cost }, transaction));
// do something with taskId
}
await conn.ExecuteAsync(@"
//do another insert here
", new { jobId }, transaction.UnderlyingTransaction);
var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required
}).ToDataTable();
var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
--Complex cte insert that returns some details to log
", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT")
}, transaction.UnderlyingTransaction)).ToListAsync();
foreach(var task in allocatedTasks)
{
await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with {task.ItemCount} items");
}
await transaction.CommitAsync();
return workItemId;
}
您还可以使用
taskId
将整个 GENERATE_SERIES
循环替换为单个插入,或者在旧版本上您可以使用 数字函数。
var taskIdList = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost)
OUTPUT inserted.Id
SELECT @jobId, @Cost
FROM GENERATE_SERIES(1, @Count);
", new { jobId, job.Cost, Count = tasks.Sum(o => o.ItemCount * o.Required) }, transaction)).ToListAsync();
// do something with taskId
根据您的精简代码很难准确判断,但您可能可以在单个 SQL 批处理中执行整套命令。