我已经使用Spring Cloud流API编写了一个Kafka流应用程序,但无法在KTable中看到任何消息。我无法追踪问题。任何指针或帮助表示赞赏。
下面是代码
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class KafkaStreamsSample {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsSample.class, args);
}
//bin/confluent local produce user -- --property parse.key=true --property key.separator=~
//2~{"id": "2", "name": "john", "age": 43}
//1~{"id": "1", "name": "bob", "age": 44}
//3~{"id": "3", "name": "peter", "age": 45}
//4~{"id": "4", "name": "mark", "age": 46}
//2~{"id": "2", "name": "john", "age": 99}
//3~{"id": "3", "name": "paul", "age": 98}
public static class KStreamToTableJoinApplication {
@Bean
public Function<KStream<String, User>, KTable<String, User>> process() {
return input -> input
.groupByKey()
.reduce((aggValue, newValue) -> newValue, Materialized.as ("allusers"));
}
}
}
application.yml
spring.application.name: stream-global-sample
spring.cloud.stream.bindings.process-in-0:
destination: user
spring.cloud.stream.bindings.process-out-0:
destination: usertable
spring.cloud.stream.kafka.streams.bindings.process-out-0:
producer:
materializedAs: allusers
使用Spring Cloud Stream Kafka Streams活页夹,您不能将出站作为KTable
。它必须是KStream
。将您的签名更改为此:public Function<KStream<String, User>, KStream<String, User>>
。然后,在您的toStream()
呼叫中呼叫reduce
。这将给您返回KStream
。这应该使您可以在出站主题中查看输出。您通过reduce
操作获得的结果将通过KTable
实现到状态存储中。因此,如果您愿意,可以通过交互式查询直接查询该状态存储。