EmbeddedKafka 身份验证失败,原因是:与客户端机制 PLAIN 发生意外的握手请求,启用的机制为 []

问题描述 投票:0回答:2

我使用EmbeddedKafkaBroker编写了一个简单的测试,我创建了一个测试生产者并发送了一条消息,但是我的KafkaListener没有被触发,所以测试每次都失败。有没有办法测试我的 Kafka 消费者,以便确保测试代码覆盖率?我希望我的假 Producer ( ProducerTest ) 从测试类内部触发我的“真实”Kafka Consumer 并处理消息。

卡夫卡消费者:

@Component
@EnableAutoConfiguration
@Slf4j
public class MyKafkaListener {

    @KafkaListener(topics = "${kafka.topic.name}")
    public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        try {
            log.info("Reading message: " +  message);
            //do stuff, process message
        } catch (Exception e) {
            log.error("Error while reading message from topic", e);
        }

    }

}

我的测试课:

@Slf4j
@ExtendWith(SpringExtension.class)
@ActiveProfiles("local")
@TestInstance(PER_CLASS)
@EmbeddedKafka(topics = { "test-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@RunWith(SpringRunner.class)
@DirtiesContext
@Disabled
@SpringBootTest
public class MyKafkaListenerTest {

    private KafkaTemplate<String, String> producer;

    public static final String TEST_KEY = "x";
    public static final String TOPIC = "test-topic";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void myKafkaListener_success_test() throws InterruptedException, ExecutionException {
        //insert object first so I can later assert that it was modified after receiving message from producer

        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
        log.info("props {}", producerProps);
        Producer<String, String> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
        producerTest.send(new ProducerRecord(TOPIC, "", TEST_KEY));
        Thread.sleep(5000);
        //Assertions.assertNull(condition to assert message has been processed);
        producerTest.close();

}

我尝试调试我的代码,但 Kafka Listener 没有被触发,这是我的测试应用程序.yaml:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    jaas:
      enabled: true
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule  required username="{USERNAME}" password="{PASSWORD}";
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
        request:
          timeout:
            ms: 20000
      group-id: kafka-list-app

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      retries: 10
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
        request:
          timeout:
            ms: 20000
        max:
          in:
            flight:
              requests:
                per:
                  connection: 1

kafka:
  topic:
    name: ${TOPIC_NAME:profil.topic-dev}

我也总是收到以下错误:

Connection to node -1 (kubernetes.docker.internal/127.0.0.1:51131) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
spring-boot apache-kafka kafka-consumer-api spring-kafka spring-kafka-test
2个回答
0
投票

首先,您需要从测试类中删除

@Disabled
。如果生产者没有发送任何内容,则不会调用消费者侦听器。

我看到您的代理配置包含 JAAS / SASL 属性,这些属性在测试中的生产者配置中缺失。根据错误,无法对客户端进行身份验证。也许你应该打印出

producerProps
并进行调试。

要使用您在配置文件中定义的生产者属性,您需要使用

private KafkaTemplate<String, String> producer;
字段,而不是
producerTest
局部变量

此外,根据 Spring Boot 文档,您的

properties
条目应如下所示

properties:
    [sasl.mechanism]: PLAIN
    [security.protocol]: SASL_PLAINTEXT
    [request.timeout.ms]: 20000
    [max.in.flight.requests.per.connection]: 1

要从 Spring 配置向客户端添加身份验证,您需要在生产者和消费者配置中的某个位置设置

sasl.jaas.config
。否则,您将继续收到身份验证用户名/密码被拒绝的错误。

properties:
    [sasl.jaas.config]: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";'

0
投票

如果代理不使用 SASL 并且您为

SASL_SSL
属性指定
SASL_PLAINTEXT
security.protocol
(就像您在问题中的
application.yaml
文件中所做的那样),您将收到此错误消息。

尝试使用

PLAIN
(在您的具体情况下)或
SSL
代替
security.protocol
将排除这种可能性。

© www.soinside.com 2019 - 2024. All rights reserved.