Kafka KStreams中的'for'循环支持

问题描述 投票:0回答:1

我需要知道如何在我的kafka KStreams行中使用'for'循环...下面是我的'for'循环,需要包含在KStreams中

for (int i = 0; i < 6 ; i++) {
            try {
                textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
                Thread.sleep(2000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }

我的KStreams看起来像

KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
                .mapValues(value -> value.replace("[","" ) )

如何将我上面的'for'循环添加到我的KStreams中

apache-kafka apache-kafka-streams
1个回答
1
投票

事情是我在'for'循环中使用value.split来分割我的数据....所以每当我的数据被分割时,它应该睡眠大约10ms ......这是因为我需要我的数据一个接一个地来

从你说你想要的订购。要实现订购,您不需要sleep。它会起作用。 Kafka Streams WordCount示例,我假设您的代码基于,以相同的方式工作:它也使用flatMapValues,并且传递到平面地图的lambda将文本行拆分为单词。

除非我和其他人误解你的问题(在这种情况下你应该进一步澄清你的问题),我认为你不必要地使你的代码复杂化。

© www.soinside.com 2019 - 2024. All rights reserved.