我有一个SpringBoot应用程序,该应用程序将Kafka与EhCache结合使用,以在不同的MicroServices和实例之间执行缓存同步。我正在使用具有匹配的Kafka-Client版本的SpringBoot 2.2.4。如何测试我的Kafka Client与嵌入式Kafka一起正常工作。
我尝试过:
测试类
@RunWith(SpringRunner.class)
@SpringBootTest()
@ActiveProfiles({"inmemory", "test", "kafka-test"})
@WebAppConfiguration
@DirtiesContext
public class CachePropagatorTest
{
private static final String topic = "com.allstate.d3.sh.test.cache";
//private static final String topic2 = "com.allstate.sh.test.alloc";
//@Rule
@ClassRule
public static final EmbeddedKafkaRule embeddedKafkaRule;
static
{
embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic);
embeddedKafkaRule
.getEmbeddedKafka().brokerListProperty("spring.kafka.bootstrap-servers");
}
//@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
KafkaTemplate<String, CacheMessage> kafkaTemplate;
@Autowired
KafkaSHProperties properties;
//@Autowired
@SpyBean
CachePropagator propagator;
//CachePropagationHelper propagator;
BlockingQueue<CacheMessage> records = new LinkedBlockingQueue<>();
/* read sent messages */
Consumer<Integer, CacheMessage> consumer;
private String topic1;
@Before
public void setUp() throws Exception
{
embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
topic1 = properties.getCacheTopic();
assertThat(topic1, is(topic));
//embeddedKafka.getEmbeddedKafka().addTopics(topic1);
try { embeddedKafka.addTopics(topic1); }
catch (KafkaException Ignored) { }
Mockito.doAnswer(new Answer<Void>()
{
@Override
public Void answer(InvocationOnMock invocation) throws Throwable
{
System.out.println("Cache Message Receive");
records.add((CacheMessage) invocation.getArgument(0));
return (Void)invocation.callRealMethod();
}
}).when(propagator).receive(ArgumentMatchers.any(),
ArgumentMatchers.anyString());
//prove raw template usage
CacheMessage cm = new CacheMessage("Test","Test","put",
true,"");
kafkaTemplate.send(topic1, cm);
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
"false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
Set<String> topics = embeddedKafka.getTopics();
assertThat(topics.size(),is(1) );
assertThat(topics,hasItem(topic1) );
//prove sent message received
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 30000);
assertThat(received.value(), is("Test"));
}
@After
public void tearDown() throws Exception { }
@Test
public void putExperiment() throws Exception
{
Date now = new Date();
JsonNode emptyNode = new ObjectMapper().readTree("");
List<BucketDetail> buckets = new ArrayList<>();
buckets.add(new BucketDetail("99-1", "Kafka Bucket 1",
0.5, emptyNode));
buckets.add(new BucketDetail("99-2", "Kafka Bucket 2",
0.5, emptyNode));
buckets.add(new BucketDetail());
ExperimentDetail exp = new ExperimentDetail("99", 1,
"KafkaTest",
"SH_TEST_PROFILE_9",
buckets, LifecycleStage.CONFIGURED,
now, null, "Mete Test Notes");
propagator.putExperiment(exp);
//TODO: test the allocation was correct
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 10000);
//TODO: how much should this verify in the message
assertThat(received.value().getAction(), is("put"));
assertThat(received.value().getItem().toString(),
containsString(exp.getExperimentID()));
}
}
[
spring:
kafka:
bootstrap-servers: localhost:2181
listener:
#add topics after start
missing-topics-fatal: false
properties:
sasl:
kerberos:
service:
name: kafka
security:
protocol: SASL_PLAINTEXT
consumer:
properties:
spring:
json:
trusted:
packages: com.allstate.d3.sh.commons.messaging
bootstrap-servers: localhost:2181
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
bootstrap-servers: localhost:2181
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
profiles:
active: inmemory,kafka-test
[运行测试时,它在单位setup()
方法中失败,并带有
java.lang.IllegalStateException: No records found for topic at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:187) at com.allstate.d3.sh.execution.event.CachePropagatorTest.setUp(CachePropagatorTest.java:169) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.lang.Thread.run(Thread.java:748)
所以为什么找不到该主题的任何已发送记录?
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
"false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
kafkaTemplate.send(topic1, cm);