前几天,我碰到了一些奇怪的事情,我在下面编写了程序,起草了用F#编写的SQL Wrapper的代码:
open System
open System.Data
open System.Transactions
open FSharp.Control
open Npgsql
[<EntryPoint>]
let main _ =
let connectionStringBuilder = NpgsqlConnectionStringBuilder()
connectionStringBuilder.Host <- "localhost"
connectionStringBuilder.Port <- 5432
connectionStringBuilder.Username <- "postgres"
connectionStringBuilder.Password <- String.Empty
connectionStringBuilder.Database <- "event_store"
use connection = new NpgsqlConnection(connectionStringBuilder.ToString())
let wasClosed = connection.State = ConnectionState.Closed
// mutable cause it's a struct
let mutable transactionOptions = TransactionOptions()
transactionOptions.IsolationLevel <- IsolationLevel.ReadCommitted
use transactionScope = new TransactionScope( TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled)
async {
for i in [0 .. 200_000] do
if wasClosed then
do! connection.OpenAsync() |> Async.AwaitTask
use command = connection.CreateCommand()
command.CommandText <- "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)"
let parameter = command.CreateParameter()
parameter.ParameterName <- "data"
parameter.Value <- string i
command.Parameters.Add(parameter) |> ignore
command.CommandTimeout <- 500
let! rowCount = command.ExecuteNonQueryAsync() |> Async.AwaitTask
if wasClosed then
connection.Close()
printfn "%A: %A" i rowCount
}
|> Async.RunSynchronously
transactionScope.Complete()
0
而且我在下面遇到了例外:
Npgsql.NpgsqlOperationInProgressException: The connection is already in state 'Executing'
at Npgsql.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|187_0(<>c__DisplayClass187_0& )
at Npgsql.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command)
at Npgsql.NpgsqlConnector.StartUserAction(NpgsqlCommand command)
at Npgsql.NpgsqlCommand.ExecuteReaderAsync(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteNonQuery(Boolean async, CancellationToken cancellationToken)
我很惊讶,因为该程序中的所有内容都在等待中,而且看起来好像连接没有按时正确关闭或打开。
为什么会这样?
我不确定这是由于我通过async
CE等待的方式,还是由于Npgsql库造成的。
[编辑]
甚至在没有async
CE +异步调用时,也会发生此问题:
open System
open System.Data
open System.Transactions
open Npgsql
[<EntryPoint>]
let main _ =
let connectionStringBuilder = NpgsqlConnectionStringBuilder()
connectionStringBuilder.Host <- "localhost"
connectionStringBuilder.Port <- 5432
connectionStringBuilder.Username <- "postgres"
connectionStringBuilder.Password <- String.Empty
connectionStringBuilder.Database <- "event_store"
use connection = new NpgsqlConnection(connectionStringBuilder.ToString())
let wasClosed = connection.State = ConnectionState.Closed
// mutable cause it's a struct
let mutable transactionOptions = TransactionOptions()
transactionOptions.IsolationLevel <- IsolationLevel.ReadCommitted
use transactionScope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions,TransactionScopeAsyncFlowOption.Enabled)
for i in [0 .. 200_000] do
if wasClosed then
connection.Open()
use command = connection.CreateCommand()
command.CommandText <- "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)"
let parameter = command.CreateParameter()
parameter.ParameterName <- "data"
parameter.Value <- string i
command.Parameters.Add(parameter) |> ignore
command.CommandTimeout <- 500
let rowCount = command.ExecuteNonQuery()
if wasClosed then
connection.Close()
printfn "%A: %A" i rowCount
transactionScope.Complete()
0
Npgsql.NpgsqlOperationInProgressException: The connection is already in state 'Executing'
at Npgsql.NpgsqlConnector.<StartUserAction>g__DoStartUserAction|187_0(<>c__DisplayClass187_0& )
at Npgsql.NpgsqlConnector.StartUserAction(ConnectorState newState, NpgsqlCommand command)
at Npgsql.NpgsqlConnector.StartUserAction(NpgsqlCommand command)
at Npgsql.NpgsqlCommand.ExecuteReaderAsync(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteNonQuery(Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteNonQuery()
我还会不时收到另一个例外:
System.Transactions.TransactionException: The operation is not valid for the state of the transaction.
---> System.TimeoutException: Transaction Timeout
--- End of inner exception stack trace ---
at System.Transactions.TransactionState.EnlistVolatile(InternalTransaction tx, ISinglePhaseNotification enlistmentNotification, EnlistmentOptions enlistmentOptions, Transaction atomicTransaction)
at System.Transactions.Transaction.EnlistVolatile(ISinglePhaseNotification singlePhaseNotification, EnlistmentOptions enlistmentOptions)
at Npgsql.NpgsqlConnection.EnlistTransaction(Transaction transaction)
at Npgsql.NpgsqlConnection.<>c__DisplayClass32_0.<<Open>g__OpenLong|0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Npgsql.NpgsqlConnection.Open()
事实是,在C#中编码相同的逻辑没有问题:
using System;
using System.Data;
using System.Threading.Tasks;
using System.Transactions;
using Npgsql;
using IsolationLevel = System.Transactions.IsolationLevel;
namespace CSharpPlayground
{
public static class Program
{
public static async Task Main()
{
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "postgres",
Password = string.Empty,
Database = "event_store"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
var transactionOptions = new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted
};
using var transactionScope = new TransactionScope(
TransactionScopeOption.RequiresNew,
transactionOptions,
TransactionScopeAsyncFlowOption.Enabled);
var wasClosed = connection.State == ConnectionState.Closed;
for (var i = 0; i < 200_000; i++)
{
if (wasClosed)
{
await connection.OpenAsync();
}
using var command = connection.CreateCommand();
command.CommandText = "INSERT INTO sch_event_store.that_table (id, data) VALUES (DEFAULT, @data)";
var parameter = command.CreateParameter();
parameter.ParameterName = "data";
parameter.Value = i.ToString();
command.Parameters.Add(parameter);
var rowCount = command.ExecuteNonQuery();
Console.WriteLine($"{i}: {rowCount}");
if (wasClosed)
{
connection.Close();
}
}
transactionScope.Complete();
}
}
}
use
语句的行为与C#中的行为略有不同(如果我错了,请纠正我)。您可以将for循环包装在其他方法中,也可以手动处理transactionScope
。我已经尝试过两个版本,它们都可以工作。