我对 kafka 流完全陌生,我正在尝试在 quarkus 应用程序中使用它。 我尝试像数据库表一样使用它。
我想知道这是否是正确的方法,以及是否有办法检查 kafka 流是否已关闭或是否处于某种错误状态。
令我困惑的是,我试图关闭我的kafka实例,但在尝试读取值时我没有注意到任何类型的错误。
就我而言,我以这种方式初始化流:
public void start() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, configurationProperties.kafkaStream().applicationId());
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
kTable = builder.table(topic, Materialized.as(configurationProperties.kafkaStream().storeName()));
streams = new KafkaStreams(builder.build(), properties);
streams.start();
keyValueStore = streams.store(StoreQueryParameters.fromNameAndType(configurationProperties.kafkaStream().storeName(), QueryableStoreTypes.keyValueStore()));
}
接下来,我有一些方法可以通过键检索值,如下所示:
private String getValue(String key) {
try {
return keyValueStore.get(key);
} catch (InvalidStateStoreException e) {
throw new StatusServiceException(e);
}
}
public int getNumber() {
final String number = getValue("number");
number = number != null && !number.isEmpty() ? Integer.parseInt(number) : 0;
emitter.send(Record.of("number", String.valueOf(number++)));
return number;
}
要发送到 kafka,我正在使用发射器:
@Inject
@Channel("status")
private Emitter<Record<String, String>> emitter;
是否有一些与此类似的示例或文档? (目前我已尝试遵循 Confluence 上提供的教程)
提前非常感谢您的帮助。
Ktable和状态存储位于Kafka流API提供的内存存储/缓存中。您从内存存储中读取记录,直到那里的记录可用并且您自己的服务启动并运行。如果关闭 Kafka 服务器,您将无法从 Kafka 读取数据,但可以连续从内存存储中的服务读取数据。
Kafka是一个日志系统,与DB表不完全匹配。例如,如果您在数据库表中插入一条记录并稍后更新它,则数据库中将只有一条记录。但 Kafka 将这两个事件(插入、更新)按顺序保留为两个不同的记录。