我们有一个集成测试,我们使用 EmbeddedKafka 并向某个主题生成一条消息,我们的应用程序处理该消息,然后将结果发送到我们使用并断言输出的第二个主题。在 CI 中,这可能在 2/3 的时间内有效,但我们会遇到
KafkaTestUtils.getSingleRecord
抛出 java.lang.IllegalStateException: No records found for topic
的情况(请参阅下面的 [1])。
为了尝试解决此问题,我为注册表中的每个侦听器容器添加了
ContainerTestUtils.waitForAssignment
(请参阅下面的 [2])。在 CI 中成功运行几次后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions
。现在我想知道这是否实际上是未找到记录的原始异常的根本原因。
有什么想法可以帮助解决这里的随机故障吗?我将不胜感激任何有关如何排除故障的建议。
spring-kafka 和 spring-kafka-test v2.6.4.
编辑:添加
newConsumer
以供参考。
我们的设置示例:
@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
topics = { "topic1","topic2" },
partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
public void testExample() {
try (Consumer<String, String> consumer = newConsumer()) {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
[2]
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
try (Producer<String, String> producer = newProducer()) {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]
producer.send(new ProducerRecord<>(
"topic1",
"test payload"));
producer.flush();
}
String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
assertEquals(result, "expected result");
}
}
private Consumer<String, String> newConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new CustomDeserializer<>());
return consumerFactory.createConsumer();
}
}
我也遇到了类似的问题,尽管我的预期是 24 个分区而不是 0。最后,这是因为我的 NewTopic Bean 有一个硬编码的分区数量,就像这样
@Bean
public NewTopic domainEventsTopic(
Topics topics,
@Value("${spring.kafka.admin.replication}") short replicationFactor,) {
return new NewTopic(topics.domainEvents(), 24, replicationFactor);
}
这与我的测试不符
ContainerTestUtils.waitForAssignment(container, partitions); // where partions is 1
我的案例的修复方法是将
spring.kafka.admin.partitions
添加到我的 application.yaml,然后创建一个 application-test.yaml,其中 spring.kafka.admin.partitions
为 1,而不是 24。
并像这样更新 bean
@Bean
public NewTopic domainEventsTopic(
Topics topics,
@Value("${spring.kafka.admin.replication}") short replicationFactor,
@Value("${spring.kafka.admin.partitions}") Integer partitions) {
return new NewTopic(topics.domainEvents(), partitions, replicationFactor);
}