从Spring Cloud Streams Kafka Stream应用中的处理器写入主题

问题描述 投票:0回答:1

我正在使用Processor API对一个状态存储进行一些低级处理。关键是我还需要在存储到存储后写进一个主题。在Spring Cloud Streams Kafka应用中如何实现?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  
apache-kafka apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

你不能这样做。该 process() 方法是一个终端操作,不允许你向下游发射数据。相反,您可以使用 transform() 虽然(基本上是一样的 process() 但允许你向下游发出数据);或者根据你的应用。transformValues()flatTransform() 等。

使用 transform() 你得到 KStream 回,你可以写成一个主题。

© www.soinside.com 2019 - 2024. All rights reserved.