防止 Kafka Streams Consumer 写入偏移量/等待一个流消耗完所有记录后再启动第二个流

问题描述 投票:0回答:1

这是一个关于流的二合一问题。

我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并将其信息存储在本地 HashMap 中

一旦该流不再有延迟,第二个流就会启动并消耗另一个主题。根据使用的数据及其属性之一,根据 HashMap 条目决定是删除记录还是进一步处理记录。 因此,第一个流需要在第二个流开始之前到达其主题的末尾。 我想保持服务无状态,因此不将数据保存在状态存储中。这会产生两个问题:

  1. 可以配置普通的 Kafka 消费者以防止其写入偏移量,从而在每次重新启动时一遍又一遍地消费该主题。
enable.auto.commit = false
auto.offset.reset = earliest

但是对于流来说这不起作用。我的临时解决方案是生成一个带有随机部分的 ApplicationID,以便忽略先前写入的偏移量。 这会为每个实例生成一个新的消费者组,从而在代理上产生许多组。

-> 有没有办法将流客户端配置为不写入偏移量?

  1. 为了让第二个流等待第一个主题被消耗,我尝试了不同的实现。我的最后一种方法如下:
      try {
        firstStreams.start();
        // Waiting for consumer to start...
        while (computeLag(firstStreams.metrics().entrySet()) < 1) {
          sleep(100);
        }
        // Waiting for Lag to reach 0
        while (computeLag(firstStreams.metrics().entrySet()) > 1) {
          sleep(100);
        }

        secondStream.start();
        shutdownLatch.await();
      } catch (Throwable e) {
        System.exit(1);
      }
    }
    System.exit(0);
  }
  
   private static double computeLag(Set<? extends Map.Entry<MetricName, ? extends Metric>> metrics) {
    return metrics.stream()
        .filter(entries -> entries.getKey().name().equals("records-lag"))
        .map(entry -> entry.getValue().metricValue().toString())
        .map(Double::parseDouble)
        .mapToDouble(Double::doubleValue)
        .sum();
  }

只要消费者未运行,消费者指标就会返回 0.0。成功启动后,它会返回所有分区的延迟。

这个版本可以工作,但似乎错误且复杂。

-> 有没有正确的方法来等待流到达“末尾”?

提前致谢并致以诚挚的问候

java apache-kafka apache-kafka-streams
1个回答
0
投票

将流客户端配置为不写入偏移量?

据我所知,没有 Kafka Streams。至少,没有状态处理

有没有正确的方法来等待流到达“结束”?

没有。您所做的似乎不错,但我认为它无法扩展到一个分区/实例之外。


接收键/值对并将其信息存储在本地 HashMap 中

这并不是 Kafka Steams 真正可接受的用例。您应该使用 KTable 和交互式查询进行查找,或连接来过滤其他流

© www.soinside.com 2019 - 2024. All rights reserved.