Kafka Streams:通过两个或多个外键重新设置 KTable

问题描述 投票:0回答:1

假设有一个(简化的)类

A
,如下所示:

class A {
  private Long id;
  private String someContent;
  private Long fk1;
  private Long fk2;
  // Getters and setters accordingly
}

fk1
fk2
都指向另一个实体的PK(为简洁起见,未显示,与问题无关)。

现在假设一个 Kafka 主题是从类

A
的实例构建的,由其
id
键入。

将主题提取到 Kafka Streams 应用程序中,如何通过

KTable<Long, A>
fk1
重新键入生成的
fk2
?请注意,生成的键不应该被更改或变成某种复合键,因为它会在以后的连接操作中使用。 我的(天真的)解决方案涉及从输入流创建两个

KTable

,相应地通过

fk1
fk2
重新键入它们,然后外部连接两个结果(重新键入)
KTable
KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...));

KTable<Long, A> rekeyedByFk1 = in
    .toTable()
    .groupBy(
        (key, value) -> KeyValue.pair(value.fk1(), value),
        Grouped.with(...))
    .aggregate(
        Aggregate::new,
        (key, value, aggregate) -> aggregate.add(value),
        (key, value, aggregate) -> aggregate.remove(value),
        Materialized.with(...));

KTable<Long, a> rekeyedByFk2 = in
    .toTable()
    .groupBy(
        (key, value) -> KeyValue.pair(value.fk2(), value),
        Grouped.with(...))
    .aggregate(
        ... same as above
    );

KTable<Long, A> joined = rekeyedByFk1
    .outerJoin(
        rekeyedByFk2,
        <value joiner>)
    .groupBy(KeyValue::pair, Grouped.with(...)).aggregate(...);

<value joiner>

将整合(已预先加入的)

Aggregate
以避免重复。
这看起来是一个可行的解决方案,还是有更好/更简单/更有效的实现?

java apache-kafka-streams
1个回答
0
投票

KTable<Long, A> in = streamsBuilder.table(...); // both fk-tables KTable<Long, B> t1 = streamsBuilder.table(...); KTable<Long, C> t2 = streamsBuilder.table(...); KTable<Long, AEnriched> result = in.join(t1, (k,v) -> v.fk1,...) .join(t2, (k,v) -> v.fk2,...);

注意“ForeignKeyExtractor”函数,它告诉 Kafka Streams 哪个左表值属性应用作 FK 来查找右表。

还有一个关于 FK-table-table 连接的很好的教程:

https://developer.confluence.io/tutorials/foreign-key-joins/kstreams.html

Kafka Streams 将在内部为您完成所有必需的联合分区。有关如何实现的详细信息,如果您有兴趣,请查看

https://www.confluence.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/

© www.soinside.com 2019 - 2024. All rights reserved.