假设有两个数据库表被 Kafka Connect for CDC 监控:
TABLE A
-------
ID
NAME
B_ID (FK)
TABLE B
-------
ID
NAME
我有一个 KStream 应用程序,它将从 cdc 主题(topicA 和 topicB)中监听以生成合并记录,例如
话题B
{idB: 1, nameB: "dep1"}
主题A
{idA: 1, nameA: "joe", b_id: 1}
在这种情况下会有一个连接(
a.join(b)
)这样我就可以访问b.name,生成这样的合并记录:
主题合并
{idA: 1, nameA: "joe", nameB: "dep1"}
当主题a有更新时,这个join真的很简单:
val join: KTable<AKey, Merged> = aKTable.join(bKTable,
{ it: AValue -> BKey(it.bId) }) { aValue, bValue ->
logger.info("Joining a {} and b {}", aValue, bValue)
merge(aValue, bValue)
}.toStream().to("merged")
当表 B 发生变化时,困难就来了。在这种情况下,我将不得不从主题 A 中检索其 fk 与更改记录匹配的所有记录。
回到前面的例子,我们生成了这个合并记录
{idA: 1, nameA: "joe", bName: "dep1"}
如果表/主题 B 将其 idB 1 记录更改为
{idB: 1, nameB: "department1"}
我应该能够从主题 A 中检索所有使用 B.id:1 的记录。但是我无法将 bKTable 与 aKTable 结合起来,因为 FK 在第一个中。
这怎么可能?