Kafka Stateless KStream:异常时如何提交直到失败点?

问题描述 投票:0回答:1

假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它无法提交。当我重新启动 Spring boot 流应用程序时,流再次从事件 1,2,3,4 开始,然后在事件 5 处失败。

为了更清楚起见,我正在阅读流中的“最新”消息。该主题有 10 条消息,因此它不会从一开始就流式传输。

当第 5 个事件在我的流处理器线程退出之前失败时,在事件 4 之前提交的适当方法是什么?

我的 Kafka Streams 配置:

 @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  KafkaStreamsConfiguration kStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ms-orders-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
    return new KafkaStreamsConfiguration(props);

我的拓扑代码:

@Autowired
  public Topology buildTopology(StreamsBuilder streamsBuilder) {

    Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
        schemaRegistryUrl);

    Serde valueGenericAvroSerde = new GenericAvroSerde();
    valueGenericAvroSerde.configure(serdeConfig, false);

    var crmOrderStream = streamsBuilder.stream(S_ORDERS,
        Consumed.with(Serdes.String(), valueGenericAvroSerde));

    crmOrderStream
        .mapValues(orderValueMapper())
        .filter(orderCompletePredicate())
        .mapValues(enrichedOrderValueMapper());
    return streamsBuilder.build();
  }
spring-boot apache-kafka apache-kafka-streams confluent-platform
1个回答
0
投票

看起来“processing.guarantee”配置具有“exactly_once”值可以避免重复记录。

只是为面临此问题的其他人记录。

© www.soinside.com 2019 - 2024. All rights reserved.