我的要求是跳过或避免使用kafka流DSL API从INPUT主题收到的重复消息(具有相同的密钥)。
如果出现任何故障,源系统可能会向INPUT主题发送重复消息。
流 -
源系统 - > INPUT主题 - > Kafka Streaming - > OUTPUT主题
目前我使用flatMap从有效负载生成多个密钥,但flatMap是无状态的,因此在从INPUT主题接收时无法避免重复的消息处理。
我正在寻找DSL API,它可以跳过从INPUT主题收到的重复记录,并在发送到OUTPUT主题之前生成多个键/值。
完全想到一旦配置在这里用于基于密钥重复删除从INPUT主题收到的消息,但看起来它不起作用,可能我不理解Exactly Once的用法。
能否请你说清楚。
我的要求是跳过或避免使用kafka流DSL API从INPUT主题收到的重复消息(具有相同的密钥)。
看看EventDeduplication
上的https://github.com/confluentinc/kafka-streams-examples示例,它就是这样做的。然后,您可以使用特定于您的用例所需的flatMap
功能来调整示例。
以下是该示例的要点:
final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
storeName);
deduplicated.to(outputTopic);
和
/**
* @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
*/
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs / 2;
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
this.idExtractor = idExtractor;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
}
public KeyValue<K, V> transform(final K key, final V value) {
final E eventId = idExtractor.apply(key, value);
if (eventId == null) {
return KeyValue.pair(key, value);
} else {
final KeyValue<K, V> output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
eventIdStore.put(eventId, newTimestamp, newTimestamp);
}
private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventIdStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
我正在寻找DSL API,它可以跳过从INPUT主题收到的重复记录,并在发送到OUTPUT主题之前生成多个键/值。
DSL不包含开箱即用的功能,但上面的示例显示了如何通过使用Transformers
将DSL与Kafka Streams的处理器API相结合,轻松构建自己的重复数据删除逻辑。
完全想到一旦配置在这里用于基于密钥重复删除从INPUT主题收到的消息,但看起来它不起作用,可能我不理解Exactly Once的用法。
正如马蒂亚斯·J·萨克斯在他的回答中提到的,从卡夫卡的角度来看,这些“重复”从其一次性处理语义的角度来看并不重复。 Kafka确保它本身不会引入任何此类重复项,但它无法为上游数据源做出开箱即用的决策,而上游数据源是Kafka的黑盒子。
完全一次可以用于确保消费和处理输入主题,不会导致输出主题中的重复。但是,从一次性的角度来看,您描述的输入主题中的重复项并不是真正的重复,而是两个常规输入消息。
对于删除输入主题重复项,您可以使用带附加状态存储的transform()
步骤(DSL中没有内置运算符可以执行您想要的操作)。对于每个输入记录,首先检查是否在商店中找到了相应的密钥。如果没有,则将其添加到商店并转发消息。如果您在商店中找到它,则将输入删除为重复。请注意,如果您在Kafka Streams应用程序中启用了一次性处理,则此功能仅适用于100%正确性保证。其他人,即使您尝试进行重复数据删除,Kafka Streams也可以在发生故障时重新引入重复。
此外,您需要确定要在商店中保留条目的时间。如果您确定输入主题中没有进一步的重复,则可以使用Punctuation
从商店中删除旧数据。一种方法是将存储记录时间戳(或可能是偏移量)存储在商店中。这样,您可以将当前时间与punctuate()
中的商店记录时间进行比较,并删除旧记录(即,您将通过store#all()
对商店中的所有条目进行迭代)。
在transform()
之后你应用你的flatMap()
(或者也可以将你的flatMap()
代码直接合并到transform()
。
谢谢Matt和Michel的帮助。非常感激。
我正在考虑使用FlatMap和FilterNot API的组合。就像州商店一样,我们将交易细节存储到cassandra中。
FilterNot - 逻辑可以包括连接Cassandra和检查重复项。 FlatMap - 逻辑包括生成多个键/值并将其发送到OUTPUT主题。
如果与Cassandra的连接失败以及第一个建议的方法 - 在每天数百万笔交易,保留期等情况下的statestore的可持续性,请关注此处。
请让我知道哪种方法更好。