如何在SpringBoot中测试Kafka客户端配置

问题描述 投票:0回答:1

我有一个SpringBoot应用程序,该应用程序将Kafka与EhCache结合使用,以在不同的MicroServices和实例之间执行缓存同步。我正在使用具有匹配的Kafka-Client版本的SpringBoot 2.2.4。如何测试我的Kafka Client与嵌入式Kafka一起正常工作。

我尝试过:

测试类

@RunWith(SpringRunner.class)
@SpringBootTest()
@ActiveProfiles({"inmemory", "test", "kafka-test"})
@WebAppConfiguration
@DirtiesContext
public class CachePropagatorTest
{
   private static final String topic = "com.allstate.d3.sh.test.cache";
   //private static final String topic2 = "com.allstate.sh.test.alloc";

   //@Rule
   @ClassRule
   public static final EmbeddedKafkaRule embeddedKafkaRule;

   static
   {
      embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic);
      embeddedKafkaRule
              .getEmbeddedKafka().brokerListProperty("spring.kafka.bootstrap-servers");
   }

   //@Autowired
   private EmbeddedKafkaBroker embeddedKafka;

   @Autowired
   KafkaTemplate<String, CacheMessage> kafkaTemplate;

   @Autowired
   KafkaSHProperties properties;

   //@Autowired
   @SpyBean
   CachePropagator propagator;
   //CachePropagationHelper propagator;

   BlockingQueue<CacheMessage> records = new LinkedBlockingQueue<>();

   /* read sent messages  */
   Consumer<Integer, CacheMessage> consumer;

   private String topic1;


   @Before
   public void setUp() throws Exception
   {
      embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();

      topic1 = properties.getCacheTopic();
      assertThat(topic1, is(topic));
      //embeddedKafka.getEmbeddedKafka().addTopics(topic1);
      try { embeddedKafka.addTopics(topic1); }
      catch (KafkaException Ignored) { }


      Mockito.doAnswer(new Answer<Void>()
      {
         @Override
         public Void answer(InvocationOnMock invocation) throws Throwable
         {
            System.out.println("Cache Message Receive");
            records.add((CacheMessage) invocation.getArgument(0));
            return (Void)invocation.callRealMethod();
         }
      }).when(propagator).receive(ArgumentMatchers.any(),
                                  ArgumentMatchers.anyString());

      //prove raw template usage
      CacheMessage cm = new CacheMessage("Test","Test","put",
                                         true,"");
      kafkaTemplate.send(topic1, cm);

      Map<String, Object> consumerProps =
         KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
                                     "false", embeddedKafka);

      DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
           new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
      consumer = cf.createConsumer();
      embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

      Set<String> topics = embeddedKafka.getTopics();
      assertThat(topics.size(),is(1) );
      assertThat(topics,hasItem(topic1) );

      //prove sent message received
      ConsumerRecord<Integer, CacheMessage> received =
              KafkaTestUtils.getSingleRecord(consumer, topic1, 30000);
      assertThat(received.value(), is("Test"));

   }

   @After
   public void tearDown() throws Exception { }

   @Test
   public void putExperiment() throws Exception
   {
      Date now = new Date();
      JsonNode emptyNode = new ObjectMapper().readTree("");
      List<BucketDetail> buckets = new ArrayList<>();
      buckets.add(new BucketDetail("99-1", "Kafka Bucket 1",
                                   0.5, emptyNode));
      buckets.add(new BucketDetail("99-2", "Kafka Bucket 2",
                                   0.5, emptyNode));
      buckets.add(new BucketDetail());

      ExperimentDetail exp = new ExperimentDetail("99", 1,
                                                  "KafkaTest",
                                                  "SH_TEST_PROFILE_9",
                                                  buckets, LifecycleStage.CONFIGURED,
                                                  now, null, "Mete Test Notes");

      propagator.putExperiment(exp);

      //TODO: test the allocation was correct


      ConsumerRecord<Integer, CacheMessage> received =
              KafkaTestUtils.getSingleRecord(consumer, topic1, 10000);

      //TODO: how much should this verify in the message
      assertThat(received.value().getAction(), is("put"));
      assertThat(received.value().getItem().toString(),
                 containsString(exp.getExperimentID()));
   }
}

[中的Kafka

spring: kafka: bootstrap-servers: localhost:2181 listener: #add topics after start missing-topics-fatal: false properties: sasl: kerberos: service: name: kafka security: protocol: SASL_PLAINTEXT consumer: properties: spring: json: trusted: packages: com.allstate.d3.sh.commons.messaging bootstrap-servers: localhost:2181 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer producer: bootstrap-servers: localhost:2181 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer profiles: active: inmemory,kafka-test
[运行测试时,它在单位setup()方法中失败,并带有

java.lang.IllegalStateException: No records found for topic at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:187) at com.allstate.d3.sh.execution.event.CachePropagatorTest.setUp(CachePropagatorTest.java:169) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.lang.Thread.run(Thread.java:748)

所以为什么找不到该主题的任何已发送记录?
spring spring-boot unit-testing testing apache-kafka
1个回答
0
投票
能否请您在代码中的下面,

    Kafka-Consumer在Kafka-Producer之前启动,甚至在Kafka-Producer发布记录后仍继续运行
  • 如果Kafka-Consumer在Kafka-Producer之后启动,则应从起始偏移开始轮询。
  • 将您的代码执行顺序更改为

    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(), "false", embeddedKafka); DefaultKafkaConsumerFactory<Integer, CacheMessage> cf = new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps); consumer = cf.createConsumer(); embeddedKafka.consumeFromAllEmbeddedTopics(consumer); kafkaTemplate.send(topic1, cm);

  • © www.soinside.com 2019 - 2024. All rights reserved.