我有以下包含 CTE 的函数:
CREATE OR REPLACE FUNCTION create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_job_hash INT,
in_task_classifier VARCHAR(255),
in_task_api_version INT,
in_task_data BYTEA,
in_task_pipe VARCHAR(255),
in_target_pipe VARCHAR(255),
in_prerequisite_job_ids VARCHAR(128)[],
in_delay INT,
in_labels VARCHAR(255)[][] default null,
in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
BEGIN
IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
RETURN QUERY SELECT FALSE;
RETURN;
END IF;
WITH prereqs_with_opts(job_id_with_opts) AS
(
SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
),
prereqs AS
(
-- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
SELECT job_id, precreated FROM
(
SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
FROM prereqs_with_opts
CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
) tbl
WHERE row_number = 1
),
locked_jobs AS
(
SELECT * FROM job
WHERE partition_id = in_partition_id
AND job_id IN (SELECT job_id FROM prereqs)
ORDER BY partition_id, job_id
FOR UPDATE
),
updated_jobs AS
(
SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
),
prereqs_created_but_not_complete AS
(
SELECT * FROM updated_jobs uj
WHERE uj.partition_id = in_partition_id
AND uj.job_id IN (SELECT job_id FROM prereqs)
AND uj.status <> 'Completed'
),
prereqs_not_created_yet AS
(
SELECT * FROM prereqs
WHERE NOT precreated AND job_id NOT IN (
SELECT job_id FROM job WHERE partition_id = in_partition_id
)
),
all_incomplete_prereqs(prerequisite_job_id) AS
(
SELECT job_id FROM prereqs_created_but_not_complete
UNION
SELECT job_id FROM prereqs_not_created_yet
)
INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
SELECT in_partition_id, in_job_id, prerequisite_job_id
FROM all_incomplete_prereqs;
IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
INSERT INTO public.job_task_data(
partition_id,
job_id,
task_classifier,
task_api_version,
task_data,
task_pipe,
target_pipe,
eligible_to_run_date,
suspended
) VALUES (
in_partition_id,
in_job_id,
in_task_classifier,
in_task_api_version,
in_task_data,
in_task_pipe,
in_target_pipe,
CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
in_suspended_partition
);
END IF;
RETURN QUERY SELECT TRUE;
END
$$;
我想做的是在
INSERT INTO public.job_dependency
语句之前引发异常(并且仍在 CTE 内,因为它锁定作业表以进行更新),如果任何作业具有 Failed
状态,例如如下所示:
all_incomplete_prereqs(prerequisite_job_id) AS
(
SELECT job_id FROM prereqs_created_but_not_complete
UNION
SELECT job_id FROM prereqs_not_created_yet
)
SELECT ARRAY_AGG(job_id) INTO failed_prerequisite_job_ids
FROM job
WHERE job_id = ANY(in_prerequisite_job_ids)
AND status = 'Failed';
IF array_length(failed_prerequisite_job_ids, 1) > 0 THEN
RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(failed_prerequisite_job_ids, ', ') USING ERRCODE = '02000'; -- sqlstate no data
END IF;
INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
SELECT in_partition_id, in_job_id, prerequisite_job_id
FROM all_incomplete_prereqs;
但是,上面的代码将不起作用,因为我添加的新语句终止了 CTE,这意味着后面的代码无法访问
all_incomplete_prereqs
表。
除了更改CTE以创建大量临时表之外,还有其他解决方案吗?
我尝试过类似以下的操作,但不确定它是否会起作用:
将这些部分添加到 CTE:
prereqs_created_but_failed AS
(
SELECT job_id FROM updated_prereqs uj
WHERE uj.partition_id = in_partition_id
AND uj.job_id IN (SELECT job_id FROM prereqs)
AND uj.status = 'Failed'
),
prereqs_created_but_failed_check AS
(
SELECT check_for_failed_prereqs(prereqs_created_but_failed)
),
有一个引发异常的附加函数:
CREATE OR REPLACE FUNCTION check_for_failed_prereqs(in_failed_prerequisite_job_ids VARCHAR[])
RETURNS BOOLEAN
LANGUAGE plpgsql
AS $$
BEGIN
IF array_length(in_failed_prerequisite_job_ids, 1) > 0 THEN
RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(in_failed_prerequisite_job_ids, ', ') USING ERRCODE = '02000'; -- sqlstate no data
END IF;
END
$$;
您可以编写一个引发所需错误的 PL/pgSQL 函数,并在 SQL 语句中调用该函数。请注意,优化器可能会选择在与您预期不同的点调用该函数 — SQL 不是过程语言。