Npgsql提供者是否支持TransactionScope?

问题描述 投票:6回答:1

我正在尝试通过Npgsql提供程序使用TransactionScope。我在一个老问题(provider for PostgreSQL in .net with support for TransactionScope)中发现Npgsql还不支持它。现在,大约5年后,Npgsql是否支持TransactionScope?我使用Npgsql 3.0.3并使用以下代码对自己进行了测试:

using (var scope = new TransactionScope())
{
    using(var connection = new Npgsql.NpgsqlConnection("server=localhost;user id=*****;password=*****database=test;CommandTimeout=0"))
    {
        connection.Open();

        var command = new NpgsqlCommand(@"insert into test.table1 values ('{10,20,30}', 2);");
        command.Connection = connection;
        var result = command.ExecuteNonQuery();

        // scope.Complete();  <-- Not committed
    }
}

任何人都可以确认Npgsql不支持TransactionScope吗?

编辑#1确认Npgsql对TransactionScope的支持后,我发现您需要确保在PostgreSQL配置中启用了分布式事务,请检查postgres.conf文件中的max_prepared_transactions参数(记住重新启动服务器)。

编辑#2我在服务器上启用了分布式事务,但是现在使用TransactionScope和Npgsql遇到错误。这是我的代码:

using (var sourceDbConnection = new NpgsqlConnection(SourceConnectionString))
using (var destinationDbConnection = new NpgsqlConnection(DestinationConnectionString))
using (var scope = new TransactionScope())
    {
        sourceDbConnection.Open();
        destinationDbConnection.Open();

        Logger.Info("Moving data for the {0} table.", TableName.ToUpper());
        var innerStopWatch = new Stopwatch();
        innerStopWatch.Start();

        foreach (var entity in entities)
        {
            var etlEntity = new EtlInfoItem
            {
                MigratedEntityId = category.RowId,
                ProjectId = category.ProjectId,
                ExecutionDatetime = DateTime.Now
            };

            // Insert into the destination database
            var isRowMigrated = InsertEntity(entity, DestinationSchema, destinationDbConnection);

            if (isRowMigrated)
            {
                // Update the ETL migration table
                InsertCompletedEtlMigrationEntity(etlEntity, EtlSchema, sourceDbConnection);
            }
            else
            {
                // Update the ETL migration table
                InsertFailedEtlMigrationEntity(etlEntity, EtlSchema, sourceDbConnection);
            }
        }

        Logger.Info("Data moved in {0} sec.", innerStopWatch.Elapsed);

        Logger.Info("Committing transaction to the source database");
                innerStopWatch.Restart();

        scope.Complete();

        innerStopWatch.Stop();
        Logger.Info("Transaction committed in {0} sec.", innerStopWatch.Elapsed);
    }

[当TransactionScope退出范围时(当退出using语句时),我得到一个Null Reference Exception,带有以下堆栈跟踪:服务器堆栈跟踪:在Npgsql.NpgsqlConnector.Cleanup()在Npgsql.NpgsqlConnector.Break()在Npgsql.NpgsqlConnector.ReadSingleMessage(DataRowLoadingMode dataRowLoadingMode,Boolean returnNullForAsyncMessage)在Npgsql.NpgsqlConnector.ReadExpectingT......它随机发生。

npgsql
1个回答
6
投票

Npgsql确实支持TransactionScope,并且已经支持了相当一段时间。但是,至少就目前而言,要使您的连接参与TransactionScope,您必须:

  1. 在连接字符串中包含Enlist=true,或
  2. 调用NpgsqlConnection.EnlistTransaction

[查看一些示例中的Npgsql unit tests


0
投票

跟随我的交易范围代码不起作用。...返回客户时出错;

using Npgsql;
using System;
using System.Transactions;
using Insight.Database;
using System.Threading.Tasks;
using System.Threading;

namespace ConsoleAppTest
{
    class Program
    {
        static string testDB1 = "server=localhost;user id=postgres;password=p@ssw0RD;database='test_db1';pooling=true";
        static string testDB2 = "server=localhost;user id=postgres;password=p@ssw0RD;database='test_db2';pooling=true";
        private static readonly TransactionOptions transactionOptions = new TransactionOptions
        {
            IsolationLevel = System.Transactions.IsolationLevel.ReadCommitted,
            Timeout = TransactionManager.MaximumTimeout
        };

        static void Main(string[] args)
        {
            try
            {
                string ss = new Program().RunFunction().GetAwaiter().GetResult();
                Console.WriteLine(ss);
            }
            catch (TransactionAbortedException ex)
            {
                Console.WriteLine("TransactionAbortedException Message: {0}", ex.Message);
            }
        }

        async Task<string> RunFunction()
        {
            Customer customer = await SaveCustomer();
            return "Success";
        }

        public static async Task<Customer> SaveCustomer()
        {
            try
            {
                NpgsqlConnection connectionDB1 = new NpgsqlConnection();
                NpgsqlConnection connectionDB2 = new NpgsqlConnection();
                Customer cust = new Customer();
                using (var scope = new TransactionScope(TransactionScopeOption.Required, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
                {
                    try
                    {
                        // Insert testDB1
                        //using (var connectionDB1 = new NpgsqlConnection(testDB1))
                        connectionDB1 = new NpgsqlConnection(testDB1);
                        //{
                        connectionDB1.Open();
                        connectionDB1.EnlistTransaction(Transaction.Current);
                        var sqlCustomer = "INSERT INTO customer.customerinfo(customername, mobile) VALUES('Chandan', 123456) RETURNING id ";
                        long custId = await connectionDB1.ExecuteScalarSqlAsync<long>(sqlCustomer);

                        //throw new Exception("Test Exception");
                        cust.CustomerId = custId;
                        //}

                        // Insert testDB2
                        //using (var connectionDB2 = new NpgsqlConnection(testDB2))
                        connectionDB2 = new NpgsqlConnection(testDB2);
                        //{
                        connectionDB2.Open();
                        connectionDB2.EnlistTransaction(Transaction.Current);
                        var sqlHistory = "INSERT INTO history.custhistory(operation) VALUES('Inserted') RETURNING id ";
                        long histId = await connectionDB2.ExecuteScalarSqlAsync<long>(sqlHistory);
                        cust.HistoryId = histId;
                        //}

                        scope.Complete();

                    }
                    catch (TransactionAbortedException ex)
                    {
                        Console.WriteLine("TransactionAbortedException Message: {0}", ex.Message);
                    }
                    catch (Exception ex)
                    {
                        scope.Dispose();
                        Console.WriteLine(ex);
                    }
                    finally
                    {
                        if (connectionDB1 != null)
                            connectionDB1.Close();
                        if (connectionDB2 != null)
                            connectionDB2.Close();
                    }

                    return cust;
                }
            }
            catch(Exception expn)
            {
                throw expn;
            }
        }

        public class Customer
        {
            public long CustomerId { get; set; }
            //private string CustomerName { get; set; }
            //private long Mobile { get; set; }
            public long HistoryId { get; set; }
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.