我有一个基于Spring Boot的Spring Cloud Stream Kafka Streams Binder应用程序。它定义了一个拓扑,其中包含以下内容:
绿色数字表示通过通过Spring Cloud Stream Kafka Streams绑定程序绑定的各个处理器定义的拓扑传递的消息数,以下是各个属性:
spring.cloud.stream.bindings:
...
hint1Stream-out-0:
destination: hints
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
我正在计算每个处理器使用以下peek()
方法产生/消耗的消息:
return stream -> {
stream
.peek((k, v)-> input0count.incrementAndGet())
...
.peek((k, v)-> output0count.incrementAndGet())
};
我正在使用具有几乎默认设置的嵌入式Kafka从单元测试中启动我的应用程序:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
...
TOPIC_HINTS
}
)
public class MyApplicationTests {
...
在我的测试中,我等待足够长的时间,直到所有已发布的测试消息到达countStream:
CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();
如您所见,放入“提示”主题的消息总数与“ counterStream”端的消息总数不匹配:1309 + 2589 != 3786
我可能缺少某些Kafka或Kafka Streams设置来刷新每个批次?也许我的自定义TimestampExtractor生成的时间戳“太旧”了? (我很确定它们的值不小于零)也许与Kafka日志压缩有关?
这种不匹配可能是什么原因?
更新
通过执行检查基础主题的偏移量
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints
测试等待超时时。
正如预期的那样,主题中的消息数等于两个输入流计数的总和。到达counterStream输入的传递的消息数量仍比预期的少几十个。
正在使用的其他Kafka配置:
spring.cloud.stream.kafka.streams:
configuration:
schema.registry.url: mock://torpedo-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
对应于processing.guarantee = at_least_once
。无法测试processing.guarantee = exactly_once
,因为这需要至少包含3个代理的群集。
同时设置:
spring.cloud.stream.kafka.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
default:
consumer:
startOffset: earliest
spring.cloud.stream.bindings:
countStream-in-0:
destination: hints
consumer:
startOffset: earliest
concurrency: 1
没有帮助:(
在countStream使用者中,帮助了仅离开 stream.peak(..)
:
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
};
}
在这种情况下,我立即开始在countConsumer端开始计算预期的消息数。
这意味着我的Count Consumer内部人员会对行为产生影响。
这里是它的完整版本,“不起作用”:
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));
KStream<String, Hint> realityStream = kstream
.filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));
KStream<String, Hint> hintsStream = kstream
.filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));
this.countsTable = kstream
.groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
.count(Materialized
.as("countsTable"));
this.countsByActionTable = kstream
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()))
.count(Materialized
.as("countsByActionTable"));
this.countsByHintRealityTable = hintsStream
.join(realityStream,
(hint, real) -> {
hint.setReal(real.getHint());
return hint;
}, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()).concat("-")
.concat(hint.getReal().toString())
)
.count(Materialized
.as("countsByHintRealityTable"));
};
}
我将计数存储在那里的几个KTable中。这是Counts Consumer内部发生的情况:
更新2] >>
Count Consumer的最后一块显然引起了最初的意外行为:
this.countsByHintRealityTable = hintsStream .join(realityStream, (hint, real) -> { hint.setReal(real.getHint()); return hint; }, JoinWindows.of(countStreamProperties.getJoinWindowSize())) .groupBy((key, hint) -> key.concat(":") .concat(hint.getDetector()).concat("|") .concat(hint.getHint().toString()).concat("-") .concat(hint.getReal().toString()) ) .count(Materialized .as("countsByHintRealityTable"));
没有它,邮件计数符合预期。
此类下游代码如何影响使用者KStream输入?
我有一个基于Spring Boot的Spring Cloud Stream Kafka Streams Binder应用程序。它定义了一个拓扑,其中包含以下内容:绿色的数字表示通过...
我认为以下内容已帮助我解决了该问题: