过去,我使用 Spring Kafka Streams 进行分组和聚合,并且工作非常顺利,但现在有一个可能的用例,我不确定如何根据需要进行工作。
我试图解决的用例是通过相关但不同的 ID 字段链接的传入消息。其中关系可能如下
例如:
{
id1: 1222
id2: 1333
},
{
id1: 1666
id2: 1777
},
{
id1: 1222
id2: 1444
},
{
id1: 1555
id2: 1444
},
{
id1: 1888
id2: 1666
}
会导致分成 2 个不同的组
{
id1: [1666, 1888]
id2: [1777, 1666]
},
{
id1: [1222, 1222, 1555]
id2: [1333, 1444, 1444]
}
我遇到的问题是,如果我们有超过 1 个分区(通常至少有 3 个),似乎没有办法保证链接的 ID 会进入同一个 Kafka 分区,这会破坏流分组功能。由于传入的消息流的大小相对较小,我将看看我们作为一个本质上单线程的应用程序可以实现什么样的吞吐量,但我想我会问是否有其他更好的方法来做到这一点,并且保持分区大于 1。
我能够结合使用正常的 groupBy 和在窗口之后与处理器进行聚合来实现此目的,然后检查当前的 stateStore 并更新它是否包含任何切向值。
基本 Kafka 流拓扑
messageStream
.groupByKey()
.windowedBy(...))
.aggregate(initializer, aggregator, merger, Materialized.with(...))
.toStream()
.process(transformerSupplier, SUPPRESSED_WINDOWED_TRANSACTION_KV_STORE)
.to("xxxx", Produced.with(...));
通过这个过程方法实现的transformerSupplier
@Override
public void process(Record record) {
Windowed<KafkaStreamKey> outputKey = (Windowed<KafkaStreamKey>) record.key();
KafkaStreamOutput outputValue = (KafkaStreamOutput) record.value();
AtomicInteger counter = new AtomicInteger(0);
if (log.isDebugEnabled()) {
log.debug(
String.format("Inserting to local keystore%nFull key [%s]%nValue [%s]", record.key(),
record.value()));
}
if (record != null && record.key() != null && record.value() != null) {
outputValue.setWindowedEndTime(outputKey.window().endTime());
kvStore.all().forEachRemaining(stringTestKafkaStreamOutputKeyValue -> {
List<String> tangentialKeys = new ArrayList<>();
tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId2());
tangentialKeys.addAll(((TestKafkaStreamOutput) record.value()).getId1());
if (stringTestKafkaStreamOutputKeyValue.value.getId1().stream()
.anyMatch(tangentialKeys::contains) ||
stringTestKafkaStreamOutputKeyValue.value.getId2().stream()
.anyMatch(tangentialKeys::contains)) {
//Combine the kvStore from incoming record and existing
List<String> newId1List = new ArrayList<>();
newId1List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId1());
newId1List.addAll(((TestKafkaStreamOutput) record.value()).getId1());
List<String> newId2List = new ArrayList<>();
newId2List.addAll(stringTestKafkaStreamOutputKeyValue.value.getId2());
newId2List.addAll(((TestKafkaStreamOutput) record.value()).getId2());
kvStore.put(stringTestKafkaStreamOutputKeyValue.key,
TestKafkaStreamOutput.builder()
.Id1(newId1List.stream().sorted().distinct().toList())
.Id2(newId2List.stream().sorted().distinct().toList())
.windowedEndTime(outputValue.getWindowedEndTime())
.build());
counter.getAndIncrement();
}
});
if(counter.get() == 0) {
//normal add to the kvStore
kvStore.put(outputKey.key().getValue(), outputValue);
}
}