如何使用Kafka流DSL功能处理重复的消息

问题描述 投票:0回答:3

我的要求是跳过或避免使用kafka流DSL API从INPUT主题收到的重复消息(具有相同的密钥)。

如果出现任何故障,源系统可能会向INPUT主题发送重复消息。

流 -

源系统 - > INPUT主题 - > Kafka Streaming - > OUTPUT主题

目前我使用flatMap从有效负载生成多个密钥,但flatMap是无状态的,因此在从INPUT主题接收时无法避免重复的消息处理。

我正在寻找DSL API,它可以跳过从INPUT主题收到的重复记录,并在发送到OUTPUT主题之前生成多个键/值。

完全想到一旦配置在这里用于基于密钥重复删除从INPUT主题收到的消息,但看起来它不起作用,可能我不理解Exactly Once的用法。

能否请你说清楚。

apache-kafka-streams
3个回答
0
投票

我的要求是跳过或避免使用kafka流DSL API从INPUT主题收到的重复消息(具有相同的密钥)。

看看EventDeduplication上的https://github.com/confluentinc/kafka-streams-examples示例,它就是这样做的。然后,您可以使用特定于您的用例所需的flatMap功能来调整示例。

以下是该示例的要点:

final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
    // In this example, we assume that the record value as-is represents a unique event ID by
    // which we can perform de-duplication.  If your records are different, adapt the extractor
    // function as needed.
    () -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
    storeName);
deduplicated.to(outputTopic);

    /**
     * @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
     *                                     ID), during the time of which any incoming duplicates of
     *                                     the event will be dropped, thereby de-duplicating the
     *                                     input.
     * @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
     *                    records; if it returns null, the record will not be considered for
     *                    de-duping but forwarded as-is.
     */
    DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
      if (maintainDurationPerEventInMs < 1) {
        throw new IllegalArgumentException("maintain duration per event must be >= 1");
      }
      leftDurationMs = maintainDurationPerEventInMs / 2;
      rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
      this.idExtractor = idExtractor;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(final ProcessorContext context) {
      this.context = context;
      eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
    }

    public KeyValue<K, V> transform(final K key, final V value) {
      final E eventId = idExtractor.apply(key, value);
      if (eventId == null) {
        return KeyValue.pair(key, value);
      } else {
        final KeyValue<K, V> output;
        if (isDuplicate(eventId)) {
          output = null;
          updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
        } else {
          output = KeyValue.pair(key, value);
          rememberNewEvent(eventId, context.timestamp());
        }
        return output;
      }
    }

    private boolean isDuplicate(final E eventId) {
      final long eventTime = context.timestamp();
      final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
          eventId,
          eventTime - leftDurationMs,
          eventTime + rightDurationMs);
      final boolean isDuplicate = timeIterator.hasNext();
      timeIterator.close();
      return isDuplicate;
    }

    private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
      eventIdStore.put(eventId, newTimestamp, newTimestamp);
    }

    private void rememberNewEvent(final E eventId, final long timestamp) {
      eventIdStore.put(eventId, timestamp, timestamp);
    }

    @Override
    public void close() {
      // Note: The store should NOT be closed manually here via `eventIdStore.close()`!
      // The Kafka Streams API will automatically close stores when necessary.
    }

  }

我正在寻找DSL API,它可以跳过从INPUT主题收到的重复记录,并在发送到OUTPUT主题之前生成多个键/值。

DSL不包含开箱即用的功能,但上面的示例显示了如何通过使用Transformers将DSL与Kafka Streams的处理器API相结合,轻松构建自己的重复数据删除逻辑。

完全想到一旦配置在这里用于基于密钥重复删除从INPUT主题收到的消息,但看起来它不起作用,可能我不理解Exactly Once的用法。

正如马蒂亚斯·J·萨克斯在他的回答中提到的,从卡夫卡的角度来看,这些“重复”从其一次性处理语义的角度来看并不重复。 Kafka确保它本身不会引入任何此类重复项,但它无法为上游数据源做出开箱即用的决策,而上游数据源是Kafka的黑盒子。


0
投票

完全一次可以用于确保消费和处理输入主题,不会导致输出主题中的重复。但是,从一次性的角度来看,您描述的输入主题中的重复项并不是真正的重复,而是两个常规输入消息。

对于删除输入主题重复项,您可以使用带附加状态存储的transform()步骤(DSL中没有内置运算符可以执行您想要的操作)。对于每个输入记录,首先检查是否在商店中找到了相应的密钥。如果没有,则将其添加到商店并转发消息。如果您在商店中找到它,则将输入删除为重复。请注意,如果您在Kafka Streams应用程序中启用了一次性处理,则此功能仅适用于100%正确性保证。其他人,即使您尝试进行重复数据删除,Kafka Streams也可以在发生故障时重新引入重复。

此外,您需要确定要在商店中保留条目的时间。如果您确定输入主题中没有进一步的重复,则可以使用Punctuation从商店中删除旧数据。一种方法是将存储记录时间戳(或可能是偏移量)存储在商店中。这样,您可以将当前时间与punctuate()中的商店记录时间进行比较,并删除旧记录(即,您将通过store#all()对商店中的所有条目进行迭代)。

transform()之后你应用你的flatMap()(或者也可以将你的flatMap()代码直接合并到transform()


0
投票

谢谢Matt和Michel的帮助。非常感激。

我正在考虑使用FlatMap和FilterNot API的组合。就像州商店一样,我们将交易细节存储到cassandra中。

FilterNot - 逻辑可以包括连接Cassandra和检查重复项。 FlatMap - 逻辑包括生成多个键/值并将其发送到OUTPUT主题。

如果与Cassandra的连接失败以及第一个建议的方法 - 在每天数百万笔交易,保留期等情况下的statestore的可持续性,请关注此处。

请让我知道哪种方法更好。

© www.soinside.com 2019 - 2024. All rights reserved.