我需要知道如何在我的kafka KStreams行中使用'for'循环...下面是我的'for'循环,需要包含在KStreams中
for (int i = 0; i < 6 ; i++) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
我的KStreams看起来像
KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
如何将我上面的'for'循环添加到我的KStreams中
事情是我在'for'循环中使用value.split来分割我的数据....所以每当我的数据被分割时,它应该睡眠大约10ms ......这是因为我需要我的数据一个接一个地来
从你说你想要的订购。要实现订购,您不需要sleep
。它会起作用。 Kafka Streams WordCount示例,我假设您的代码基于,以相同的方式工作:它也使用flatMapValues
,并且传递到平面地图的lambda将文本行拆分为单词。
除非我和其他人误解你的问题(在这种情况下你应该进一步澄清你的问题),我认为你不必要地使你的代码复杂化。