从 Kafka Streams 读取值并检查错误

问题描述 投票:0回答:1

我对 kafka 流完全陌生,我正在尝试在 quarkus 应用程序中使用它。 我尝试像数据库表一样使用它。

我想知道这是否是正确的方法,以及是否有办法检查 kafka 流是否已关闭或是否处于某种错误状态。

令我困惑的是,我试图关闭我的kafka实例,但在尝试读取值时我没有注意到任何类型的错误。

就我而言,我以这种方式初始化流:

public void start() {
   Properties properties = new Properties();
   properties.put(StreamsConfig.APPLICATION_ID_CONFIG,   configurationProperties.kafkaStream().applicationId());
   properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
   properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
   properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

   kTable = builder.table(topic, Materialized.as(configurationProperties.kafkaStream().storeName()));

   streams = new KafkaStreams(builder.build(), properties);
   streams.start();

   keyValueStore = streams.store(StoreQueryParameters.fromNameAndType(configurationProperties.kafkaStream().storeName(), QueryableStoreTypes.keyValueStore()));
}

接下来,我有一些方法可以通过键检索值,如下所示:

  private String getValue(String key) {
    try {
      return keyValueStore.get(key);
    } catch (InvalidStateStoreException e) {
      throw new StatusServiceException(e);
    }
  }

  public int getNumber() {
    final String number = getValue("number");
    number = number != null && !number.isEmpty() ? Integer.parseInt(number) : 0;

    emitter.send(Record.of("number", String.valueOf(number++)));
    return number;
  }

要发送到 kafka,我正在使用发射器:

  @Inject
  @Channel("status")
  private Emitter<Record<String, String>> emitter;

是否有一些与此类似的示例或文档? (目前我已尝试遵循 Confluence 上提供的教程)

提前非常感谢您的帮助。

apache-kafka quarkus apache-kafka-streams
1个回答
0
投票

Ktable和状态存储位于Kafka流API提供的内存存储/缓存中。您从内存存储中读取记录,直到那里的记录可用并且您自己的服务启动并运行。如果关闭 Kafka 服务器,您将无法从 Kafka 读取数据,但可以连续从内存存储中的服务读取数据。

Kafka是一个日志系统,与DB表不完全匹配。例如,如果您在数据库表中插入一条记录并稍后更新它,则数据库中将只有一条记录。但 Kafka 将这两个事件(插入、更新)按顺序保留为两个不同的记录。

© www.soinside.com 2019 - 2024. All rights reserved.