我使用EmbeddedKafkaBroker编写了一个简单的测试,我创建了一个测试生产者并发送了一条消息,但是我的KafkaListener没有被触发,所以测试每次都失败。有没有办法测试我的 Kafka 消费者,以便确保测试代码覆盖率?我希望我的假 Producer ( ProducerTest ) 从测试类内部触发我的“真实”Kafka Consumer 并处理消息。
卡夫卡消费者:
@Component
@EnableAutoConfiguration
@Slf4j
public class MyKafkaListener {
@KafkaListener(topics = "${kafka.topic.name}")
public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
try {
log.info("Reading message: " + message);
//do stuff, process message
} catch (Exception e) {
log.error("Error while reading message from topic", e);
}
}
}
我的测试课:
@Slf4j
@ExtendWith(SpringExtension.class)
@ActiveProfiles("local")
@TestInstance(PER_CLASS)
@EmbeddedKafka(topics = { "test-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@RunWith(SpringRunner.class)
@DirtiesContext
@Disabled
@SpringBootTest
public class MyKafkaListenerTest {
private KafkaTemplate<String, String> producer;
public static final String TEST_KEY = "x";
public static final String TOPIC = "test-topic";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void myKafkaListener_success_test() throws InterruptedException, ExecutionException {
//insert object first so I can later assert that it was modified after receiving message from producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
log.info("props {}", producerProps);
Producer<String, String> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producerTest.send(new ProducerRecord(TOPIC, "", TEST_KEY));
Thread.sleep(5000);
//Assertions.assertNull(condition to assert message has been processed);
producerTest.close();
}
我尝试调试我的代码,但 Kafka Listener 没有被触发,这是我的测试应用程序.yaml:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
jaas:
enabled: true
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="{USERNAME}" password="{PASSWORD}";
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
group-id: kafka-list-app
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 10
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
max:
in:
flight:
requests:
per:
connection: 1
kafka:
topic:
name: ${TOPIC_NAME:profil.topic-dev}
我也总是收到以下错误:
Connection to node -1 (kubernetes.docker.internal/127.0.0.1:51131) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
首先,您需要从测试类中删除
@Disabled
。如果生产者没有发送任何内容,则不会调用消费者侦听器。
我看到您的代理配置包含 JAAS / SASL 属性,这些属性在测试中的生产者配置中缺失。根据错误,无法对客户端进行身份验证。也许你应该打印出
producerProps
并进行调试。
要使用您在配置文件中定义的生产者属性,您需要使用
private KafkaTemplate<String, String> producer;
字段,而不是 producerTest
局部变量
此外,根据 Spring Boot 文档,您的
properties
条目应如下所示
properties:
[sasl.mechanism]: PLAIN
[security.protocol]: SASL_PLAINTEXT
[request.timeout.ms]: 20000
[max.in.flight.requests.per.connection]: 1
要从 Spring 配置向客户端添加身份验证,您需要在生产者和消费者配置中的某个位置设置
sasl.jaas.config
。否则,您将继续收到身份验证用户名/密码被拒绝的错误。
properties:
[sasl.jaas.config]: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";'
如果代理不使用 SASL 并且您为
SASL_SSL
属性指定 SASL_PLAINTEXT
或 security.protocol
(就像您在问题中的 application.yaml
文件中所做的那样),您将收到此错误消息。
尝试使用
PLAIN
(在您的具体情况下)或 SSL
代替 security.protocol
将排除这种可能性。