为什么在F#中使用`TransactionScope`时,NpgsqlConnection不能正确地关闭或不能及时打开?

问题描述 投票:2回答:1

前几天,我碰到了一些奇怪的事情,我在下面编写了程序,起草了用F#编写的SQL Wrapper的代码:

open System

open System.Data
open System.Transactions

open FSharp.Control

open Npgsql


[<EntryPoint>]
let main _ =
    let connectionStringBuilder = NpgsqlConnectionStringBuilder()
    connectionStringBuilder.Host <- "localhost"
    connectionStringBuilder.Port <- 5432
    connectionStringBuilder.Username <- "postgres"
    connectionStringBuilder.Password <- String.Empty
    connectionStringBuilder.Database <- "event_store"

    use connection = new NpgsqlConnection(connectionStringBuilder.ToString())
    let wasClosed = connection.State = ConnectionState.Closed

    // mutable cause it's a struct
    let mutable transactionOptions = TransactionOptions()
    transactionOptions.IsolationLevel <- IsolationLevel.ReadCommitted
    use transactionScope = new TransactionScope( TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)
    async {
        for i in [0 .. 200_000] do
            if wasClosed then
                do! connection.OpenAsync() |> Async.AwaitTask
            use command = connection.CreateCommand()
            command.CommandText <- "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)"
            let parameter = command.CreateParameter()
            parameter.ParameterName <- "data"
            parameter.Value <- string i
            command.Parameters.Add(parameter) |> ignore
            command.CommandTimeout <- 500
            let! rowCount = command.ExecuteNonQueryAsync() |> Async.AwaitTask
            if wasClosed then
                connection.Close()
            printfn "%A: %A" i rowCount
    }
    |> Async.RunSynchronously
    transactionScope.Complete()
    0

而且我在下面遇到了例外:

Npgsql.NpgsqlOperationInProgressException: The connection is already in state 'Executing'
   at Npgsql.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|187_0(<>c__DisplayClass187_0& )
   at Npgsql.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command)
   at Npgsql.NpgsqlConnector.StartUserAction(NpgsqlCommand command)
   at Npgsql.NpgsqlCommand.ExecuteReaderAsync(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteNonQuery(Boolean async, CancellationToken cancellationToken)

我很惊讶,因为该程序中的所有内容都在等待中,而且看起来好像连接没有按时正确关闭或打开。

为什么会这样?

我不确定这是由于我通过async CE等待的方式,还是由于Npgsql库造成的。

[编辑]

甚至在没有async CE +异步调用时,也会发生此问题:

open System

open System.Data
open System.Transactions

open Npgsql


[<EntryPoint>]
let main _ =
    let connectionStringBuilder = NpgsqlConnectionStringBuilder()
    connectionStringBuilder.Host <- "localhost"
    connectionStringBuilder.Port <- 5432
    connectionStringBuilder.Username <- "postgres"
    connectionStringBuilder.Password <- String.Empty
    connectionStringBuilder.Database <- "event_store"

    use connection = new NpgsqlConnection(connectionStringBuilder.ToString())
    let wasClosed = connection.State = ConnectionState.Closed

    // mutable cause it's a struct
    let mutable transactionOptions = TransactionOptions()
    transactionOptions.IsolationLevel <- IsolationLevel.ReadCommitted
    use transactionScope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions,TransactionScopeAsyncFlowOption.Enabled)
    for i in [0 .. 200_000] do
        if wasClosed then
            connection.Open()
        use command = connection.CreateCommand()
        command.CommandText <- "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)"
        let parameter = command.CreateParameter()
        parameter.ParameterName <- "data"
        parameter.Value <- string i
        command.Parameters.Add(parameter) |> ignore
        command.CommandTimeout <- 500
        let rowCount = command.ExecuteNonQuery()
        if wasClosed then
            connection.Close()
        printfn "%A: %A" i rowCount
    transactionScope.Complete()
    0
Npgsql.NpgsqlOperationInProgressException: The connection is already in state 'Executing'
   at Npgsql.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|187_0(<>c__DisplayClass187_0& )
   at Npgsql.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command)
   at Npgsql.NpgsqlConnector.StartUserAction(NpgsqlCommand command)
   at Npgsql.NpgsqlCommand.ExecuteReaderAsync(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteNonQuery(Boolean async, CancellationToken cancellationToken)
   at Npgsql.NpgsqlCommand.ExecuteNonQuery()

我还会不时收到另一个例外:

System.Transactions.TransactionException: The operation is not valid for the state of the transaction.
 ---> System.TimeoutException: Transaction Timeout
   --- End of inner exception stack trace ---
   at System.Transactions.TransactionState.EnlistVolatile(InternalTransaction tx, ISinglePhaseNotification enlistmentNotification, EnlistmentOptions enlistmentOptions, Transaction atomicTransaction)
   at System.Transactions.Transaction.EnlistVolatile(ISinglePhaseNotification singlePhaseNotification, EnlistmentOptions enlistmentOptions)
   at Npgsql.NpgsqlConnection.EnlistTransaction(Transaction transaction)
   at Npgsql.NpgsqlConnection.<>c__DisplayClass32_0.<<Open>g__OpenLong|0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Npgsql.NpgsqlConnection.Open()

事实是,在C#中编码相同的逻辑没有问题:

using System;
using System.Data;
using System.Threading.Tasks;
using System.Transactions;
using Npgsql;
using IsolationLevel = System.Transactions.IsolationLevel;


namespace CSharpPlayground
{
    public static class Program
    {
        public static async Task Main()
        {
            var connectionStringBuilder = new NpgsqlConnectionStringBuilder
            {
                Host = "localhost",
                Port = 5432,
                Username = "postgres",
                Password = string.Empty,
                Database = "event_store"
            };

            using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());

            var transactionOptions = new TransactionOptions
            {
                IsolationLevel = IsolationLevel.ReadCommitted
            };

            using var transactionScope = new TransactionScope(
                TransactionScopeOption.RequiresNew,
                transactionOptions,
                TransactionScopeAsyncFlowOption.Enabled);

            var wasClosed = connection.State == ConnectionState.Closed;

            for (var i = 0; i < 200_000; i++)
            {
                if (wasClosed)
                {
                    await connection.OpenAsync();
                }
                using var command = connection.CreateCommand();
                command.CommandText = "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)";
                var parameter = command.CreateParameter();
                parameter.ParameterName = "data";
                parameter.Value = i.ToString();
                command.Parameters.Add(parameter);
                var rowCount = command.ExecuteNonQuery();
                Console.WriteLine($"{i}: {rowCount}");
                if (wasClosed)
                {
                    connection.Close();
                }
            }

            transactionScope.Complete();
        }
    }
}
postgresql asynchronous async-await f# npgsql
1个回答
0
投票

use语句的行为与C#中的行为略有不同(如果我错了,请纠正我)。您可以将for循环包装在其他方法中,也可以手动处理transactionScope。我已经尝试过两个版本,它们都可以工作。

© www.soinside.com 2019 - 2024. All rights reserved.