有条件地从处理 postgresql 数据库中的 debezium 数据的 kafka 连接中删除 Neo4j 中的节点

问题描述 投票:0回答:1

我正在尝试实现一个 postgresql -> Debezium Kafka Connect 源 -> Kafka -> Neo4j Kafka Connect 接收器 -> Neo4j。 Debezium 发送的数据包含包含字段

op = "c/u/d"
(创建/更新/删除)的事件。示例文档显示了一种使用
FOREACH
来测试创建/更新是否应该发生以及该部分是否有效的模式。我无法开始工作的是,如果发生
op = "d"
事件,如何删除节点(我在主题中看到它)。

我当前的卡夫卡主题密码行如下所示(已格式化,原始是一长行):

FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END | 
  MERGE (p:DemoTable{id: event.after.id}) 
  SET p.message = event.after.message, p.last_changed = event.ts_ms
) 
WITH event
MATCH (p:DemoTable{id: event.after.id}) 
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END | 
  DELETE p
)

这不会显示任何错误,但也不会删除任何节点。

我尝试了删除部分的多个版本,直到得到这个:

  • 没有
    WITH event
    (有关
    MATCH
    FOREACH
    的错误仅适用于
    WITH
    ),
  • 在 foreach 中运行匹配(
    MATCH
    中不允许使用
    FOREACH
    ),
  • DELETE (p:DemoTable{id: event.after.id})
    中使用
    FOREACH
    (代码末尾关于
    expected whitespace or a relationship pattern
    的错误)

有条件地处理删除事件的正确模式是什么?

apache-kafka neo4j debezium
1个回答
0
投票

更新:教程现在描述了 CDC 到 Neo4j 模式,这似乎比下面的要容易得多(尚未尝试)。

旧答案:

发现问题:debezium 的删除事件有

event.after = null
,所以在删除中,需要
MATCH (p:DemoTable{id: event.before.id})

最终代码:

FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END | 
  MERGE (p:DemoTable{id: event.after.id}) 
  SET p.message = event.after.message, p.last_changed = event.ts_ms
) 
WITH event
MATCH (p:DemoTable{id: event.before.id}) 
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END | 
  DELETE p
)
© www.soinside.com 2019 - 2024. All rights reserved.