假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它无法提交。当我重新启动 Spring boot 流应用程序时,流再次从事件 1,2,3,4 开始,然后在事件 5 处失败。
为了更清楚起见,我正在阅读流中的“最新”消息。该主题有 10 条消息,因此它不会从一开始就流式传输。
当第 5 个事件在我的流处理器线程退出之前失败时,在事件 4 之前提交的适当方法是什么?
我的 Kafka Streams 配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ms-orders-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
return new KafkaStreamsConfiguration(props);
我的拓扑代码:
@Autowired
public Topology buildTopology(StreamsBuilder streamsBuilder) {
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
schemaRegistryUrl);
Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
var crmOrderStream = streamsBuilder.stream(S_ORDERS,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
crmOrderStream
.mapValues(orderValueMapper())
.filter(orderCompletePredicate())
.mapValues(enrichedOrderValueMapper());
return streamsBuilder.build();
}
看起来“processing.guarantee”配置具有“exactly_once”值可以避免重复记录。
只是为面临此问题的其他人记录。