创建 EmbeddedKafka 代理时,使用 @EmbeddedKafka 进行测试失败并出现 TimeoutException

问题描述 投票:0回答:1

从 Spring-Boot 2.4 -> 2.7 升级并升级 Kafka 2.8.2 -> 3.6.1 后使用我的应用程序运行单元测试

我的单元测试设置如下:

@RunWith(SpringRunner.class)
@SpringBootTest
@WebAppConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@ActiveProfiles("inmemory")
@EmbeddedKafka(topics={"KafkaDataHelperNonSpyUnitTest.topic"}, //partitions = 3,
               adminTimeout = 30)
@TestPropertySource(value = "classpath:hsql-test.properties",
                    properties = "spring.cache.type=NONE")
public class KafkaDataHelperNonSpyUnitTest
{
  /*
   * Setup Beans Here
   */  

   @Test
   public void testGetConsumer()
   {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedBrokers);
      props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);

      kafkaDataHelper.getConsumer("KafkaDataHelperNonSpyUnitTest.topic", props);
   }
}

测试失败

Failed to load ApplicationContext
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:98)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:248)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
    at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
    at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
    at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:431)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:333)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:605)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:374)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:307)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:136)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:141)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:90)
    ... 43 more
Caused by: org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:490)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics$8(EmbeddedKafkaBroker.java:477)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:573)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:476)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:373)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    ... 52 more
Caused by: java.util.concurrent.TimeoutException
    at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:487)
    ... 58 more

那么,为什么测试在创建嵌入式 Kafka Broker 时会超时?
更重要的是,我该如何解决它?

更新:

相关

build.gradle
依赖项片段

buildscript
{
   ext { springBootVersion = '2.7.18' }

   dependencies
   {
      classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
   }
}

apply plugin: "org.springframework.boot"
apply plugin: 'io.spring.dependency-management'

dependencies
{
    // Spring Framework (Core)
    def springFrameworkVersion = 5.3.27
    implementation ('org.springframework:spring-core')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-aop')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-context')
    { version { strictly springFrameworkVersion } }
    implementation('org.springframework:spring-context-support')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-beans')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-messaging')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-expression')
    { version { strictly springFrameworkVersion } }

    // Spring Framework (Data)
    implementation('org.springframework:spring-jdbc')
    { version { strictly springFrameworkVersion } }
    implementation('org.springframework:spring-tx')
    { version { strictly springFrameworkVersion } }

    // Spring Framework (Web)
    implementation ('org.springframework:spring-web')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-websocket')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-webmvc')
    { version { strictly springFrameworkVersion } }
    implementation ('org.springframework:spring-webflux')
    { version { strictly springFrameworkVersion } }

    implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
    testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"

    // Spring Framework (Security)
    def springSecurityVersion = '5.7.11'
    implementation("org.springframework.security:spring-security-core")
    { version { strictly springSecurityVersion } }
    implementation("org.springframework.security:spring-security-web")
    { version { strictly springSecurityVersion } }
    implementation("org.springframework.security:spring-security-ldap")
    { version { strictly springSecurityVersion } }
    implementation("org.springframework.security:spring-security-crypto")
    { version { strictly springSecurityVersion } }
    testImplementation("org.springframework.security:spring-security-test")
    { version { strictly springSecurityVersion } }

    implementation "org.springframework.security.oauth:spring-security-oauth2:2.5.2.RELEASE"

    implementation"org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
    implementation('org.springframework.retry:spring-retry:1.3.3')

    runtimeOnly("org.springframework.boot:spring-boot-properties-migrator:${springBootVersion}")

    // Kafka
    def KafkaClientVersion = '3.6.1'
    implementation('org.springframework.kafka:spring-kafka:')
    { version { strictly('2.9.13') } }
    testImplementation('org.springframework.kafka:spring-kafka-test')
    { version { strictly('2.9.13') } }
    implementation('org.apache.kafka:kafka-clients')
    { version { strictly(KafkaClientVersion) } }
    testImplementation('org.apache.kafka:kafka-streams')
    { version { strictly(KafkaClientVersion) } }
    implementation('org.apache.kafka:kafka-server-common')
    { version { strictly(KafkaClientVersion) } }
    testImplementation('org.apache.kafka:kafka-server-common')
    { version { strictly(KafkaClientVersion) } }
    implementation('org.apache.kafka:kafka-storage')
    { version { strictly(KafkaClientVersion) } }
    testImplementation('org.apache.kafka:kafka-tools')
    { version { strictly(KafkaClientVersion) } }
    testImplementation('org.apache.kafka:connect-json')
    { version { strictly(KafkaClientVersion) } }
    testImplementation('org.apache.kafka:connect-api')
    { version { strictly(KafkaClientVersion) } }

    testImplementation("org.apache.kafka:kafka_2.13:${KafkaClientVersion}:test")
}
java spring spring-boot apache-kafka spring-kafka
1个回答
0
投票

在尝试了很多事情之后。我最终转向了另一个应用程序。遇到同样的错误。

我找到的解决方案是从

@EmbeddedKafka
注释中删除主题名称。

© www.soinside.com 2019 - 2024. All rights reserved.