带有事务的 Dapper 会抛出“打开与此命令关联的 DataReader,必须首先关闭该命令”

问题描述 投票:0回答:1

我间歇性地收到

InvalidOperationException
“打开与此命令关联的 DataReader,必须先关闭该命令”

这是一段太长的关键任务代码,它是一个存储过程,可能仍然应该是,但几年前我将其转换为 Dapper 调用,并使用事务在其中一部分失败时放弃全部代码。

我已经看到了几个像herehere这样的SO问题,它们是用

ToList()
解决的,但是,我在查询结果上使用
AsList()
,它只调用对Azure表存储的记录器调用和for在参数列表上循环,所以不认为这些应用在这里。

我还使用 EF6 DBContext 连接,而不是创建标准 SqlConnection,这可能只是解决此问题,但不希望在不首先了解问题的情况下进行更改。

此方法每天运行数百次,每天都不会失败,但当它抛出此异常时,客户将被锁定,无法在至少 15 分钟内重新运行此方法。

public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
    var conn = _context.Database.Connection;
    conn.Open();

    using (var transaction = _context.Database.BeginTransaction())
    {
        try
        {
            var checks = await conn.QueryMultipleAsync(@" 
    SELECT Id FROM Customer 
    WHERE Id = @customerId
    SELECT Id FROM Job WHERE Id = @jobId
            ", new { customerId, jobId }, transaction.UnderlyingTransaction);

            var customer = (await checks.ReadAsync<int?>()).SingleOrDefault();
            if (customer == null)
            {
                throw new ServiceNotFoundException($"Customer {customerId} not found");
            }

            var job = (await checks.ReadAsync<JobDto>()).SingleOrDefault();
            if (job == null)
            {
                throw new ServiceNotFoundException($"Job {jobId} not found");
            }

            var workItemId = (await conn.QueryAsync<int>(@"
    DECLARE @WorkItemId int
    INSERT WorkItem (CustomerId, JobId, JobDate)
    VALUES (@customerId, @job, GETDATE())
    SELECT @WorkItemId = SCOPE_IDENTITY()
    IF @CustomerId > 0
        UPDATE Customer SET LatestActionDate = GETDATE() WHERE Id = @CustomerId
    SELECT @WorkItemId
            ", new { customerId, jobId }, transaction.UnderlyingTransaction)).SingleOrDefault();

            await _auditLogger.LogJob(jobId, "Job created", user);

            for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
            {
                var taskId = (await conn.QueryAsync<int>(@"
    INSERT Task (JobId, Cost,) VALUES (@jobId, @Cost)
    SELECT Id FROM Task WHERE Id = CAST(SCOPE_IDENTITY() as int)
                ", new { jobId, job.Cost }, transaction.UnderlyingTransaction));
            }

            await conn.ExecuteAsync(@"
    //do another insert here
                        ", new { jobId }, transaction.UnderlyingTransaction);

            var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required 
    }).ToDataTable();

            var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
    --Complex cte insert that returns some details to log

            ", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT") 
    }, transaction.UnderlyingTransaction)).AsList();

            foreach(var task in allocatedTasks)
            {
                await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with `enter code here`{task.ItemCount} items");
            }

            transaction.Commit();

            return workItemId;
        }
        catch
        {
            transaction.Rollback();
            throw;
        }
    }
}
c# sql-server entity-framework-6 dapper
1个回答
0
投票

您在

using
上缺少
checks
,它为读者提供了参考。这必须使用大括号才能放置在正确的位置。

更多问题:

  • 您在
    QueryAsync
    位中使用
    QueryFirstAsync
    而不是
    taskId
  • 您可以在同一部分中使用
    OUTPUT
    而不是单独的
    SELECT
  • 您不需要
    catch
    Rollback
    using
    会做到这一点。
  • 该连接还需要一个
    using
    ,您应该创建一个新连接,而不是使用 EF Core 中的连接。
  • 打开连接并异步创建事务。
  • 使用更短的语法,例如尽可能不带大括号的
    using
    语句,以及
    ?? throw
    用于空检查。
  • 请勿使用
    AsList
    ,它仅检查
    List<T>
    ,不会转换且不会处置。
public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
    await using var conn = CreateNewConnection();
    await conn.OpenAsync();

    await using var transaction = conn.BeginTransactionAsync();

    int customer, JobDto job;
    await (using var checks = await conn.QueryMultipleAsync(@" 
    SELECT Id FROM Customer 
    WHERE Id = @customerId;
    SELECT Id FROM Job WHERE Id = @jobId;
            ", new { customerId, jobId }, transaction))
    {
        customer = (await checks.ReadAsync<int?>()).SingleOrDefault()
            ?? throw new ServiceNotFoundException($"Customer {customerId} not found");

        job = (await checks.ReadAsync<JobDto>()).SingleOrDefault()
            ?? throw new ServiceNotFoundException($"Job {jobId} not found");
    }

    var workItemId = (await conn.QueryAsync<int>(@"
    INSERT WorkItem (CustomerId, JobId, JobDate)
    OUTPUT inserted.Id
    VALUES (@customerId, @job, GETDATE());

    IF @CustomerId > 0
        UPDATE Customer
        SET LatestActionDate = GETDATE()
        WHERE Id = @CustomerId;
            ", new { customerId, jobId }, transaction)).SingleOrDefault();

    await _auditLogger.LogJob(jobId, "Job created", user);

    for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
    {
        var taskId = (await conn.QueryAsync<int>(@"
    INSERT Task (JobId, Cost)
    OUTPUT inserted.Id
    VALUES (@jobId, @Cost);
        ", new { jobId, job.Cost }, transaction));

        // do something with taskId
    }

    await conn.ExecuteAsync(@"
    //do another insert here
                        ", new { jobId }, transaction.UnderlyingTransaction);

    var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required 
    }).ToDataTable();

    var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
    --Complex cte insert that returns some details to log

            ", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT") 
    }, transaction.UnderlyingTransaction)).ToListAsync();

    foreach(var task in allocatedTasks)
    {
        await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with {task.ItemCount} items");
    }

    await transaction.CommitAsync();

    return workItemId;
}

您还可以使用

taskId
将整个
GENERATE_SERIES
循环替换为单个插入,或者在旧版本上您可以使用 数字函数

var taskIdList = (await conn.QueryAsync<int>(@"
    INSERT Task (JobId, Cost)
    OUTPUT inserted.Id
    SELECT @jobId, @Cost
    FROM GENERATE_SERIES(1, @Count);
        ", new { jobId, job.Cost, Count = tasks.Sum(o => o.ItemCount * o.Required) }, transaction)).ToListAsync();

// do something with taskId

根据您的精简代码很难准确判断,但您可能可以在单个 SQL 批处理中执行整套命令。

© www.soinside.com 2019 - 2024. All rights reserved.