假设有一个(简化的)类
A
,如下所示:
class A {
private Long id;
private String someContent;
private Long fk1;
private Long fk2;
// Getters and setters accordingly
}
fk1
和fk2
都指向另一个实体的PK(为简洁起见,未显示,与问题无关)。
现在假设一个 Kafka 主题是从类
A
的实例构建的,由其 id
键入。
将主题提取到 Kafka Streams 应用程序中,如何通过
KTable<Long, A>
和 fk1
重新键入生成的 fk2
?请注意,生成的键不应该被更改或变成某种复合键,因为它会在以后的连接操作中使用。
我的(天真的)解决方案涉及从输入流创建两个KTable
,相应地通过
fk1
和fk2
重新键入它们,然后外部连接两个结果(重新键入)KTable
。KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...));
KTable<Long, A> rekeyedByFk1 = in
.toTable()
.groupBy(
(key, value) -> KeyValue.pair(value.fk1(), value),
Grouped.with(...))
.aggregate(
Aggregate::new,
(key, value, aggregate) -> aggregate.add(value),
(key, value, aggregate) -> aggregate.remove(value),
Materialized.with(...));
KTable<Long, a> rekeyedByFk2 = in
.toTable()
.groupBy(
(key, value) -> KeyValue.pair(value.fk2(), value),
Grouped.with(...))
.aggregate(
... same as above
);
KTable<Long, A> joined = rekeyedByFk1
.outerJoin(
rekeyedByFk2,
<value joiner>)
.groupBy(KeyValue::pair, Grouped.with(...)).aggregate(...);
<value joiner>
将整合(已预先加入的)
Aggregate
以避免重复。这看起来是一个可行的解决方案,还是有更好/更简单/更有效的实现?
KTable<Long, A> in = streamsBuilder.table(...);
// both fk-tables
KTable<Long, B> t1 = streamsBuilder.table(...);
KTable<Long, C> t2 = streamsBuilder.table(...);
KTable<Long, AEnriched> result =
in.join(t1, (k,v) -> v.fk1,...)
.join(t2, (k,v) -> v.fk2,...);
注意“ForeignKeyExtractor”函数,它告诉 Kafka Streams 哪个左表值属性应用作 FK 来查找右表。
还有一个关于 FK-table-table 连接的很好的教程:
https://developer.confluence.io/tutorials/foreign-key-joins/kstreams.htmlhttps://www.confluence.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/