给定一个如下所示的有向无环图:
分支 a 头 id 分支 b 头 id | | id-Z id-C | | id-Y id-B | | id-X id-A | | 分支 a 第一个 id 分支 b 第一个 id | | \ / \ / \ / 共同祖先 ID | id-2 | id-1 | 初始 ID
并以 PostgreSQL 表结构建模,例如:
从图表中选择*; 编号 |家长 -------------------+-------------- 初始 ID | (无效的) id-1 |初始 ID id-2 | id-1 共同祖先 ID | id-2 分支第一个 id |共同祖先 ID id-X |分支第一 ID id-Y | id-X id-Z | id-Y 分支头 id | id-Z 分支 b 第一个 id |共同祖先 ID id-A |分支 B 第一 ID id-B | id-A id-C | id-B 分支 B 头 ID | id-C
我需要一个查询来查找共同祖先。这是一个有效的查询但效率低下:
WITH RECURSIVE linked_list AS(
SELECT id, parent FROM graphtable WHERE id = 'branch-a-head-id' OR id = 'branch-b-head-id'
UNION ALL
SELECT g.id, g.parent FROM graphtable g INNER JOIN linked_list ll ON ll.parent = g.id
)
SELECT string_agg(id, ',') AS ids, parent FROM linked_list GROUP BY parent HAVING COUNT(DISTINCT id) > 1;
对于上述结构返回:
id |家长 ----------------------------------+------------------------ -------- 分支 a 第一个 id、分支 b 第一个 id |共同祖先 ID
这是低效的,因为它总是遍历所有行,直到找到父级为 NULL 的行。这意味着,如果您的结构中 A 和 B 分支都很小(各 100 行),但公共部分很大(数百万行),它将迭代数百万行而不是数百行。
这是一个基准,A 和 B 分支各有 100 个节点,公共节点数量可变:
# 个节点 | 查询时间(秒) |
---|---|
16 | 0.011 |
32 | 0.008 |
64 | 0.011 |
128 | 0.014 |
256 | 0.026 |
512 | 0.058 |
1024 | 0.146 |
2048 | 0.44 |
4096 | 0.028 |
8192 | 0.049 |
16384 | 0.104 |
32768 | 0.209 |
65536 | 0.423 |
131072 | 0.859 |
262144 | 1.781 |
524288 | 3.914 |
问题:如何使此查询高效?每个分支保持 100 个节点,并且仅改变公共节点的数量,这似乎是 O(N)。希望 O(1) 是可能的。
我对 Postgres 特别感兴趣。谢谢!
尽管 PL/pgSQL 与 SQL 相比相当慢,但我仍然建议编写数据库函数。该算法是程序化的,因此这是一个自然的选择。这样就很容易避免获取任一参数的all祖先:
CREATE OR REPLACE FUNCTION first_common_ancestor(a text, b text) RETURNS text
LANGUAGE plpgsql AS
$$DECLARE
iter integer := 1;
a_ancestors text[] := ARRAY[a];
b_ancestors text[] := ARRAY[b];
a_next text;
b_next text;
a_done boolean := FALSE;
b_done boolean := FALSE;
BEGIN
WHILE NOT a_done AND NOT b_done LOOP
/* test if we have found a common ancestor */
IF ARRAY[a_ancestors[iter]] && b_ancestors THEN
RETURN a_ancestors[iter];
END IF;
IF ARRAY[b_ancestors[iter]] && a_ancestors THEN
RETURN b_ancestors[iter];
END IF;
/* get the next ancestors */
SELECT parent INTO a_next
FROM graphtable
WHERE id = a_ancestors[iter];
IF FOUND THEN
a_ancestors := a_ancestors || a_next;
ELSE
a_done := TRUE;
END IF;
SELECT parent INTO b_next
FROM graphtable
WHERE id = b_ancestors[iter];
IF FOUND THEN
b_ancestors := b_ancestors || b_next;
ELSE
b_done := TRUE;
END IF;
iter := iter + 1;
END LOOP;
/* no common ancestor */
RETURN NULL;
END;$$;