无法通过Spring Cloud streamng在Kafka Ktable中看到任何消息

问题描述 投票:0回答:1

我已经使用Spring Cloud流API编写了一个Kafka流应用程序,但无法在KTable中看到任何消息。我无法追踪问题。任何指针或帮助表示赞赏。

下面是代码

import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class KafkaStreamsSample {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsSample.class, args);
    }

        //bin/confluent local produce user -- --property parse.key=true --property key.separator=~
        //2~{"id": "2", "name": "john", "age": 43}
        //1~{"id": "1", "name": "bob", "age": 44}
        //3~{"id": "3", "name": "peter", "age": 45}
        //4~{"id": "4", "name": "mark", "age": 46}
        //2~{"id": "2", "name": "john", "age": 99}
        //3~{"id": "3", "name": "paul", "age": 98}

    public static class KStreamToTableJoinApplication {

        @Bean
        public Function<KStream<String, User>, KTable<String, User>> process() {


                return input -> input
                .groupByKey()
                .reduce((aggValue, newValue) -> newValue, Materialized.as ("allusers"));
        }
    }
}

application.yml

spring.application.name: stream-global-sample
spring.cloud.stream.bindings.process-in-0:
  destination: user
spring.cloud.stream.bindings.process-out-0:
  destination: usertable
spring.cloud.stream.kafka.streams.bindings.process-out-0:
  producer:
    materializedAs: allusers
apache-kafka spring-kafka spring-cloud-stream
1个回答
0
投票

使用Spring Cloud Stream Kafka Streams活页夹,您不能将出站作为KTable。它必须是KStream。将您的签名更改为此:public Function<KStream<String, User>, KStream<String, User>>。然后,在您的toStream()呼叫中呼叫reduce。这将给您返回KStream。这应该使您可以在出站主题中查看输出。您通过reduce操作获得的结果将通过KTable实现到状态存储中。因此,如果您愿意,可以通过交互式查询直接查询该状态存储。

© www.soinside.com 2019 - 2024. All rights reserved.