我最近开始研究适用于Kafka的Spring Cloud Stream,并努力让TestBinder与Kstreams合作。这是一个已知的限制,还是我只是忽略了一些东西?
这很好用:
字符串处理器
@StreamListener(TopicBinding.INPUT)
@SendTo(TopicBinding.OUTPUT)
public String process(String message) {
return message + " world";
}
字符串测试:
@Test
@SuppressWarnings("unchecked")
public void testString() {
Message<String> message = new GenericMessage<>("Hello");
topicBinding.input().send(message);
Message<String> received = (Message<String>) messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
但是当我尝试在我的过程中使用KStream时,我无法让TestBinder工作。
Kstream处理器:
@SendTo(TopicBinding.OUTPUT)
public KStream<String, String> process(
@Input(TopicBinding.INPUT) KStream<String, String> events) {
return events.mapValues((value) -> value + " world");
}
KStream测试:
@Test
@SuppressWarnings("unchecked")
public void testKstream() {
Message<String> message = MessageBuilder
.withPayload("Hello")
.setHeader(KafkaHeaders.TOPIC, "event.sirism.dev".getBytes())
.setHeader(KafkaHeaders.MESSAGE_KEY, "Test".getBytes())
.build();
topicBinding.input().send(message);
Message<String> received = (Message<String>)
messageCollector.forChannel(topicBinding.output()).poll();
assertThat(received.getPayload(), equalTo("Hello world"));
}
正如您可能已经注意到的那样,我从Kstream处理器中省略了@StreamListener,但没有它,看起来似乎测试条不能找到处理程序。 (但有了它,它在启动应用程序时不起作用)
这是一个已知的错误/限制,还是我只是在做一些愚蠢的事情?
测试绑定器仅适用于基于MessageChannel的绑定器(AbstractMessageChannelBinder
的子类)。 KStreamBinder
不使用MessageChannel
s。
您可以使用spring-kafka-test模块提供的真实绑定器和嵌入式kafka代理进行测试。
另见this issue。