Spring Cloud Stream Kafka Streams:下游消息数与发送给该主题的消息总数不匹配

问题描述 投票:1回答:1

我有一个基于Spring Boot的Spring Cloud Stream Kafka Streams Binder应用程序。它定义了一个拓扑,其中包含以下内容:

enter image description here

绿色数字表示通过通过Spring Cloud Stream Kafka Streams绑定程序绑定的各个处理器定义的拓扑传递的消息数,以下是各个属性:

spring.cloud.stream.bindings:
  ...
  hint1Stream-out-0:
    destination: hints
  realityStream-out-0:
    destination: hints
  countStream-in-0:
    destination: hints

我正在计算每个处理器使用以下peek()方法产生/消耗的消息:

return stream -> {
    stream
        .peek((k, v)-> input0count.incrementAndGet())
        ...
        .peek((k, v)-> output0count.incrementAndGet())
};

我正在使用具有几乎默认设置的嵌入式Kafka从单元测试中启动我的应用程序:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
                ...
                TOPIC_HINTS
        }
)
public class MyApplicationTests {
...

在我的测试中,我等待足够长的时间,直到所有已发布的测试消息到达countStream:

CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();

如您所见,放入“提示”主题的消息总数与“ counterStream”端的消息总数不匹配:1309 + 2589 != 3786

我可能缺少某些Kafka或Kafka Streams设置来刷新每个批次?也许我的自定义TimestampExtractor生成的时间戳“太旧”了? (我很确定它们的值不小于零)也许与Kafka日志压缩有关?

这种不匹配可能是什么原因?

更新

通过执行检查基础主题的偏移量

kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints

测试等待超时时。

正如预期的那样,主题中的消息数等于两个输入流计数的总和。到达counterStream输入的传递的消息数量仍比预期的少几十个。

正在使用的其他Kafka配置:

spring.cloud.stream.kafka.streams:
    configuration:
      schema.registry.url: mock://torpedo-stream-registry
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      commit.interval.ms: 100

对应于processing.guarantee = at_least_once。无法测试processing.guarantee = exactly_once,因为这需要至少包含3个代理的群集。

同时设置:

spring.cloud.stream.kafka.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
  default:
    consumer:
      startOffset: earliest
spring.cloud.stream.bindings:
  countStream-in-0:
    destination: hints
    consumer:
      startOffset: earliest
      concurrency: 1

没有帮助:(

在countStream使用者中,帮助了仅离开 stream.peak(..)

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
    };
}

在这种情况下,我立即开始在countConsumer端开始计算预期的消息数。

这意味着我的Count Consumer内部人员会对行为产生影响。

这里是它的完整版本,“不起作用”:

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));

        KStream<String, Hint> realityStream = kstream
            .filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));

        KStream<String, Hint> hintsStream = kstream
            .filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));

        this.countsTable = kstream
            .groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
            .count(Materialized
                .as("countsTable"));

        this.countsByActionTable = kstream
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()))
            .count(Materialized
                .as("countsByActionTable"));

        this.countsByHintRealityTable = hintsStream
            .join(realityStream,
                (hint, real) -> {
                    hint.setReal(real.getHint());
                    return hint;
                }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()).concat("-")
                .concat(hint.getReal().toString())
            )
            .count(Materialized
                .as("countsByHintRealityTable"));

    };
}

我将计数存储在那里的几个KTable中。这是Counts Consumer内部发生的情况:

enter image description here

更新2] >>

Count Consumer的最后一块显然引起了最初的意外行为:

this.countsByHintRealityTable = hintsStream
        .join(realityStream,
            (hint, real) -> {
                hint.setReal(real.getHint());
                return hint;
            }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
        .groupBy((key, hint) -> key.concat(":")
            .concat(hint.getDetector()).concat("|")
            .concat(hint.getHint().toString()).concat("-")
            .concat(hint.getReal().toString())
        )
        .count(Materialized
            .as("countsByHintRealityTable"));

没有它,邮件计数符合预期。

此类下游代码如何影响使用者KStream输入?

我有一个基于Spring Boot的Spring Cloud Stream Kafka Streams Binder应用程序。它定义了一个拓扑,其中包含以下内容:绿色的数字表示通过...

apache-kafka apache-kafka-streams spring-cloud-stream
1个回答
0
投票

我认为以下内容已帮助我解决了该问题:

© www.soinside.com 2019 - 2024. All rights reserved.