我正在尝试通过Npgsql提供程序使用TransactionScope。我在一个老问题(provider for PostgreSQL in .net with support for TransactionScope)中发现Npgsql还不支持它。现在,大约5年后,Npgsql是否支持TransactionScope?我使用Npgsql 3.0.3并使用以下代码对自己进行了测试:
using (var scope = new TransactionScope())
{
using(var connection = new Npgsql.NpgsqlConnection("server=localhost;user id=*****;password=*****database=test;CommandTimeout=0"))
{
connection.Open();
var command = new NpgsqlCommand(@"insert into test.table1 values ('{10,20,30}', 2);");
command.Connection = connection;
var result = command.ExecuteNonQuery();
// scope.Complete(); <-- Not committed
}
}
任何人都可以确认Npgsql不支持TransactionScope吗?
编辑#1确认Npgsql对TransactionScope的支持后,我发现您需要确保在PostgreSQL配置中启用了分布式事务,请检查postgres.conf文件中的max_prepared_transactions参数(记住重新启动服务器)。
编辑#2我在服务器上启用了分布式事务,但是现在使用TransactionScope和Npgsql遇到错误。这是我的代码:
using (var sourceDbConnection = new NpgsqlConnection(SourceConnectionString))
using (var destinationDbConnection = new NpgsqlConnection(DestinationConnectionString))
using (var scope = new TransactionScope())
{
sourceDbConnection.Open();
destinationDbConnection.Open();
Logger.Info("Moving data for the {0} table.", TableName.ToUpper());
var innerStopWatch = new Stopwatch();
innerStopWatch.Start();
foreach (var entity in entities)
{
var etlEntity = new EtlInfoItem
{
MigratedEntityId = category.RowId,
ProjectId = category.ProjectId,
ExecutionDatetime = DateTime.Now
};
// Insert into the destination database
var isRowMigrated = InsertEntity(entity, DestinationSchema, destinationDbConnection);
if (isRowMigrated)
{
// Update the ETL migration table
InsertCompletedEtlMigrationEntity(etlEntity, EtlSchema, sourceDbConnection);
}
else
{
// Update the ETL migration table
InsertFailedEtlMigrationEntity(etlEntity, EtlSchema, sourceDbConnection);
}
}
Logger.Info("Data moved in {0} sec.", innerStopWatch.Elapsed);
Logger.Info("Committing transaction to the source database");
innerStopWatch.Restart();
scope.Complete();
innerStopWatch.Stop();
Logger.Info("Transaction committed in {0} sec.", innerStopWatch.Elapsed);
}
[当TransactionScope退出范围时(当退出using语句时),我得到一个Null Reference Exception,带有以下堆栈跟踪:服务器堆栈跟踪:在Npgsql.NpgsqlConnector.Cleanup()在Npgsql.NpgsqlConnector.Break()在Npgsql.NpgsqlConnector.ReadSingleMessage(DataRowLoadingMode dataRowLoadingMode,Boolean returnNullForAsyncMessage)在Npgsql.NpgsqlConnector.ReadExpectingT......它随机发生。
Npgsql确实支持TransactionScope,并且已经支持了相当一段时间。但是,至少就目前而言,要使您的连接参与TransactionScope,您必须:
Enlist=true
,或[查看一些示例中的Npgsql unit tests。
跟随我的交易范围代码不起作用。...返回客户时出错;
using Npgsql;
using System;
using System.Transactions;
using Insight.Database;
using System.Threading.Tasks;
using System.Threading;
namespace ConsoleAppTest
{
class Program
{
static string testDB1 = "server=localhost;user id=postgres;password=p@ssw0RD;database='test_db1';pooling=true";
static string testDB2 = "server=localhost;user id=postgres;password=p@ssw0RD;database='test_db2';pooling=true";
private static readonly TransactionOptions transactionOptions = new TransactionOptions
{
IsolationLevel = System.Transactions.IsolationLevel.ReadCommitted,
Timeout = TransactionManager.MaximumTimeout
};
static void Main(string[] args)
{
try
{
string ss = new Program().RunFunction().GetAwaiter().GetResult();
Console.WriteLine(ss);
}
catch (TransactionAbortedException ex)
{
Console.WriteLine("TransactionAbortedException Message: {0}", ex.Message);
}
}
async Task<string> RunFunction()
{
Customer customer = await SaveCustomer();
return "Success";
}
public static async Task<Customer> SaveCustomer()
{
try
{
NpgsqlConnection connectionDB1 = new NpgsqlConnection();
NpgsqlConnection connectionDB2 = new NpgsqlConnection();
Customer cust = new Customer();
using (var scope = new TransactionScope(TransactionScopeOption.Required, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
{
try
{
// Insert testDB1
//using (var connectionDB1 = new NpgsqlConnection(testDB1))
connectionDB1 = new NpgsqlConnection(testDB1);
//{
connectionDB1.Open();
connectionDB1.EnlistTransaction(Transaction.Current);
var sqlCustomer = "INSERT INTO customer.customerinfo(customername, mobile) VALUES('Chandan', 123456) RETURNING id ";
long custId = await connectionDB1.ExecuteScalarSqlAsync<long>(sqlCustomer);
//throw new Exception("Test Exception");
cust.CustomerId = custId;
//}
// Insert testDB2
//using (var connectionDB2 = new NpgsqlConnection(testDB2))
connectionDB2 = new NpgsqlConnection(testDB2);
//{
connectionDB2.Open();
connectionDB2.EnlistTransaction(Transaction.Current);
var sqlHistory = "INSERT INTO history.custhistory(operation) VALUES('Inserted') RETURNING id ";
long histId = await connectionDB2.ExecuteScalarSqlAsync<long>(sqlHistory);
cust.HistoryId = histId;
//}
scope.Complete();
}
catch (TransactionAbortedException ex)
{
Console.WriteLine("TransactionAbortedException Message: {0}", ex.Message);
}
catch (Exception ex)
{
scope.Dispose();
Console.WriteLine(ex);
}
finally
{
if (connectionDB1 != null)
connectionDB1.Close();
if (connectionDB2 != null)
connectionDB2.Close();
}
return cust;
}
}
catch(Exception expn)
{
throw expn;
}
}
public class Customer
{
public long CustomerId { get; set; }
//private string CustomerName { get; set; }
//private long Mobile { get; set; }
public long HistoryId { get; set; }
}
}
}