从 CTE (Postgres) 内部引发异常

问题描述 投票:0回答:1

我有以下包含 CTE 的函数:

CREATE OR REPLACE FUNCTION create_job(
    in_partition_id VARCHAR(40),
    in_job_id VARCHAR(48),
    in_name VARCHAR(255),
    in_description TEXT,
    in_data TEXT,
    in_job_hash INT,
    in_task_classifier VARCHAR(255),
    in_task_api_version INT,
    in_task_data BYTEA,
    in_task_pipe VARCHAR(255),
    in_target_pipe VARCHAR(255),
    in_prerequisite_job_ids VARCHAR(128)[],
    in_delay INT,
    in_labels VARCHAR(255)[][] default null,
    in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
    job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
BEGIN
    IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
        RETURN QUERY SELECT FALSE;
        RETURN;
    END IF;

    WITH prereqs_with_opts(job_id_with_opts) AS
    (
        SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
    ),
    prereqs AS
    (
        -- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
        SELECT job_id, precreated FROM
        (
            SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
            FROM prereqs_with_opts
            CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
        ) tbl
        WHERE row_number = 1
    ),
    locked_jobs AS
    (
        SELECT * FROM job
        WHERE partition_id = in_partition_id
            AND job_id IN (SELECT job_id FROM prereqs)
        ORDER BY partition_id, job_id
        FOR UPDATE
    ),
    updated_jobs AS
    (
        SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
    ),
    prereqs_created_but_not_complete AS
    (
        SELECT * FROM updated_jobs uj
        WHERE uj.partition_id = in_partition_id
            AND uj.job_id IN (SELECT job_id FROM prereqs)
            AND uj.status <> 'Completed'
    ),
    prereqs_not_created_yet AS
    (
        SELECT * FROM prereqs
        WHERE NOT precreated AND job_id NOT IN (
            SELECT job_id FROM job WHERE partition_id = in_partition_id
        )
    ),
    all_incomplete_prereqs(prerequisite_job_id) AS
    (
        SELECT job_id FROM prereqs_created_but_not_complete
        UNION
        SELECT job_id FROM prereqs_not_created_yet
    )

    INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
    SELECT in_partition_id, in_job_id, prerequisite_job_id
    FROM all_incomplete_prereqs;

    IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
        INSERT INTO public.job_task_data(
            partition_id,
            job_id,
            task_classifier,
            task_api_version,
            task_data,
            task_pipe,
            target_pipe,
            eligible_to_run_date,
            suspended
        ) VALUES (
            in_partition_id,
            in_job_id,
            in_task_classifier,
            in_task_api_version,
            in_task_data,
            in_task_pipe,
            in_target_pipe,
            CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
            in_suspended_partition
        );
    END IF;

    RETURN QUERY SELECT TRUE;
END
$$;

我想做的是在

INSERT INTO public.job_dependency
语句之前引发异常(并且仍在 CTE 内,因为它锁定作业表以进行更新),如果任何作业具有
Failed
状态,例如如下所示:

all_incomplete_prereqs(prerequisite_job_id) AS
(
    SELECT job_id FROM prereqs_created_but_not_complete
    UNION
    SELECT job_id FROM prereqs_not_created_yet
)


SELECT ARRAY_AGG(job_id) INTO failed_prerequisite_job_ids
FROM job
WHERE job_id = ANY(in_prerequisite_job_ids)
  AND status = 'Failed';

IF array_length(failed_prerequisite_job_ids, 1) > 0 THEN
    RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(failed_prerequisite_job_ids, ', ') USING ERRCODE = '02000'; -- sqlstate no data
END IF;

INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
SELECT in_partition_id, in_job_id, prerequisite_job_id
FROM all_incomplete_prereqs;

但是,上面的代码将不起作用,因为我添加的新语句终止了 CTE,这意味着后面的代码无法访问

all_incomplete_prereqs
表。

除了更改CTE以创建大量临时表之外,还有其他解决方案吗?

我尝试过类似以下的操作,但不确定它是否会起作用:

将这些部分添加到 CTE:

prereqs_created_but_failed AS
(
    SELECT job_id FROM updated_prereqs uj
    WHERE uj.partition_id = in_partition_id
        AND uj.job_id IN (SELECT job_id FROM prereqs)
        AND uj.status = 'Failed'
),
prereqs_created_but_failed_check AS
(
    SELECT check_for_failed_prereqs(prereqs_created_but_failed)
),

有一个引发异常的附加函数:

CREATE OR REPLACE FUNCTION check_for_failed_prereqs(in_failed_prerequisite_job_ids VARCHAR[])
    RETURNS BOOLEAN
    LANGUAGE plpgsql
AS $$
BEGIN
    IF array_length(in_failed_prerequisite_job_ids, 1) > 0 THEN
        RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(in_failed_prerequisite_job_ids, ', ') USING ERRCODE = '02000'; -- sqlstate no data
    END IF;
END
$$;
sql postgresql
1个回答
0
投票

您可以编写一个引发所需错误的 PL/pgSQL 函数,并在 SQL 语句中调用该函数。请注意,优化器可能会选择在与您预期不同的点调用该函数 — SQL 不是过程语言。

© www.soinside.com 2019 - 2024. All rights reserved.