如何使用 Npgsql 从 PostgreSQL 过程 refcursor 参数获取数据

问题描述 投票:0回答:2

我们正在从本地 SQL Server 迁移到云托管的 PostgreSQL。我们的数据库正在自动翻译,这导致了许多过程,其中数据作为引用参数而不是普通的记录集返回,例如

CREATE OR REPLACE PROCEDURE myschema_dbo.usp_getcustomers(
    IN par_includeinactive numeric,
    INOUT p_refcur refcursor)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
    OPEN p_refcur FOR
    SELECT customer.name AS customername, customer.isactive, customer.emailaddress
        FROM myschema_dbo.customer
        WHERE customer.isactive = 1 OR par_IncludeInactive = 0
END;
$BODY$;

调用应用程序主要是 C#,我们尝试使用 Npgsql 数据提供程序来调用这些过程:

using (var dataSource = NpgsqlDataSource.Create(this.connectionString))
{
    using (var command = dataSource.CreateCommand("myschema_dbo.usp_getcustomers"))
    {
        command.CommandType = CommandType.StoredProcedure;
        command.Parameters.Add(new NpgsqlParameter()
        {
            ParameterName = "par_includeinactive",
            NpgsqlDbType = NpgsqlDbType.Numeric,
            Direction = ParameterDirection.Input,
            Value = 1
        });
        command.Parameters.Add(new NpgsqlParameter()
        {
            ParameterName = "p_refcur",
            NpgsqlDbType = NpgsqlDbType.Refcursor,
            Direction = ParameterDirection.InputOutput,
            Value = "p_refcur"
        });
        command.ExecuteNonQuery();

        command.CommandText = "fetch all in \"p_refcur\"";
        command.CommandType = CommandType.Text;
        var reader = command.ExecuteReader();
        while (reader.Read())
        {
            // Process the record
        }
    }
}

这会导致 Npgsql.PostgresException 并显示错误消息 34000:游标“p_refcur”不存在。由于我们数据库中的自动翻译过程和程序量,修改存储过程将是一项非常昂贵的操作,我们宁愿避免。

我尝试使用其值获取引用游标参数,或传递不同的值。我还搜索了堆栈溢出,但唯一类似的问题是指返回引用游标而不是用作过程参数的函数.

postgresql parameters procedure npgsql ref-cursor
2个回答
0
投票

我强烈建议修改这些存储过程以直接返回表而不是引用游标 - SQL Server 存储过程的自动转换似乎在那里做得不太好。

例如,上面的函数可以简化为以下内容:

CREATE FUNCTION usp_getcustomers(include_active bool) RETURNS customer
AS $$ 
    SELECT customer.name AS customername, customer.isactive, customer.emailaddress
    FROM myschema_dbo.customer
    WHERE customer.isactive = 1 OR NOT include_active
$$$
LANGUAGE SQL;

这样效率更高,因为您可以在一次往返中返回结果;如果您的函数返回引用游标,则需要执行额外的往返以从该引用游标获取结果;对于 PostgreSQL 来说,这也是不必要的复杂和不自然。

另请注意,对于如此简单的函数,您可以简单地使用 SQL 而不是 pl/pgsql。此外,PostgreSQL 有一个第一类布尔数据类型,您应该使用它,例如

include_active
而不是数字。


0
投票

乔纳森,

我相信您的问题是由于没有使用交易造成的。这是必需的,因为这是返回门户并从中读取内容的两步操作。

我注意到的另一件事是你应该将你的读者放在 using() 子句中。否则,如果您的读者没有读取所有行,则可能会在事务结束时导致事务出现问题(需要执行回滚/提交)。

所以只需将代码更改为下面的代码(我已经删除了 NpgsqlDataSource 的使用,因为我以前从未使用过它):

using (var conn = new NpgsqlConnection(this.connectionString))
{
    conn.Open();

    using (var trans = conn.BeginTransaction())
    {
        using (var command = new NpgsqlCommand("myschema_dbo.usp_getcustomers", conn))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.Add(new NpgsqlParameter()
            {
                ParameterName = "par_includeinactive",
                NpgsqlDbType = NpgsqlDbType.Numeric,
                Direction = ParameterDirection.Input,
                Value = 1
            });
            command.Parameters.Add(new NpgsqlParameter()
            {
                ParameterName = "p_refcur",
                NpgsqlDbType = NpgsqlDbType.Refcursor,
                Direction = ParameterDirection.InputOutput,
                Value = "p_refcur"
            });
            command.ExecuteNonQuery();

            command.CommandText = "fetch all in \"p_refcur\"";
            command.CommandType = CommandType.Text;
            using (var reader = command.ExecuteReader())
            {
                while (reader.Read())
                {
                    // Process the record
                }
            }
        }
    }
}

另一个选择是隐藏这个混乱并将其包装在 NpgsqlRefcursorCommand 类中。它将处理事务并添加引用。 然后你就可以像下面这样使用它:

using (var conn = new NpgsqlConnection(this.connectionString))
{
    conn.Open();

    using (var command = new NpgsqlRefcursorCommand("myschema_dbo.usp_getcustomers", conn))
    {
        command.CommandType = CommandType.StoredProcedure;
        command.Parameters.Add(new NpgsqlParameter()
        {
            ParameterName = "par_includeinactive",
            NpgsqlDbType = NpgsqlDbType.Numeric,
            Direction = ParameterDirection.Input,
            Value = 1
        });

        using (var reader = command.ExecuteReader())
        {
            while (reader.Read())
            {
                // Process the record
            }
        }
    }
}

NpgsqlRefcursorCommand 看起来像:

public class NpgsqlRefcursorCommand : NpgsqlCommand
{
    NpgsqlTransaction? mInternalTrans = null;

    public NpgsqlRefcursorCommand() 
        : base(null, null, null) { }

    public NpgsqlRefcursorCommand(string? cmdText) 
        : base(cmdText, null, null) { }

    public NpgsqlRefcursorCommand(string? cmdText, NpgsqlConnection? connection) 
        : base(cmdText, connection) { }

    public NpgsqlRefcursorCommand(string? cmdText, NpgsqlConnection? connection, NpgsqlTransaction? transaction)
        : base(cmdText, connection, transaction) { }

    public new NpgsqlDataReader ExecuteReader(CommandBehavior behavior = CommandBehavior.Default)
        => ExecuteReaderAsync(behavior, CancellationToken.None).GetAwaiter().GetResult();

    public new async Task<NpgsqlDataReader> ExecuteReaderAsync(CommandBehavior behavior = CommandBehavior.Default, CancellationToken cancellationToken = default)
    {
        if (Connection == null || Connection.State != ConnectionState.Open)
            throw new ArgumentException("Connection is not open!");

        if (Transaction == null)
        {
            using (NpgsqlCommand checkTransCmd = new NpgsqlCommand("select txid_current_if_assigned()", Connection))
            {
                object? oTransId = await checkTransCmd.ExecuteScalarAsync(cancellationToken);
                if (oTransId == DBNull.Value)
                {
                    mInternalTrans = await Connection.BeginTransactionAsync();
                }
            }
        }

        if (!Parameters.Any(x => x.NpgsqlDbType == NpgsqlDbType.Refcursor))
        {
            Parameters.Add(new NpgsqlParameter()
            {
                //ParameterName is not needed if procedure only has one ref cursor
                NpgsqlDbType = NpgsqlDbType.Refcursor,
                Direction = ParameterDirection.Input,
                Value = DBNull.Value
            });
        }

        var refCursorName = await ExecuteScalarAsync();

        if (refCursorName == null)
            throw new ArgumentException("Expected procedure to return cursor!");

        CommandText = $"fetch all in \"{refCursorName}\"";
        CommandType = CommandType.Text;

        return await base.ExecuteReaderAsync(behavior, cancellationToken);
    }

    protected override void Dispose(bool disposing)
    {
        if (mInternalTrans != null)
        {
            mInternalTrans.Rollback();
            mInternalTrans.Dispose();
        }
        base.Dispose(disposing);
    }

    public override ValueTask DisposeAsync()
    {
        if (mInternalTrans != null)
        {
            mInternalTrans.RollbackAsync();
            mInternalTrans.DisposeAsync();
        }
        return base.DisposeAsync();
    }
}

--约翰

© www.soinside.com 2019 - 2024. All rights reserved.