我正在尝试实现一个 postgresql -> Debezium Kafka Connect 源 -> Kafka -> Neo4j Kafka Connect 接收器 -> Neo4j。 Debezium 发送的数据包含包含字段
op = "c/u/d"
(创建/更新/删除)的事件。示例文档显示了一种使用 FOREACH
来测试创建/更新是否应该发生以及该部分是否有效的模式。我无法开始工作的是,如果发生 op = "d"
事件,如何删除节点(我在主题中看到它)。
我当前的卡夫卡主题密码行如下所示(已格式化,原始是一长行):
FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END |
MERGE (p:DemoTable{id: event.after.id})
SET p.message = event.after.message, p.last_changed = event.ts_ms
)
WITH event
MATCH (p:DemoTable{id: event.after.id})
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END |
DELETE p
)
这不会显示任何错误,但也不会删除任何节点。
我尝试了删除部分的多个版本,直到得到这个:
WITH event
(有关MATCH
和FOREACH
的错误仅适用于WITH
),MATCH
中不允许使用 FOREACH
),DELETE (p:DemoTable{id: event.after.id})
中使用FOREACH
(代码末尾关于expected whitespace or a relationship pattern
的错误)有条件地处理删除事件的正确模式是什么?
更新:教程现在描述了 CDC 到 Neo4j 模式,这似乎比下面的要容易得多(尚未尝试)。
旧答案:
发现问题:debezium 的删除事件有
event.after = null
,所以在删除中,需要 MATCH (p:DemoTable{id: event.before.id})
。
最终代码:
FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END |
MERGE (p:DemoTable{id: event.after.id})
SET p.message = event.after.message, p.last_changed = event.ts_ms
)
WITH event
MATCH (p:DemoTable{id: event.before.id})
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END |
DELETE p
)