我有以下课程:
public static class GenerateMetaAlert implements WindowFunction<Tuple2<String, Boolean>, Tuple2<String, Boolean>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, Boolean>> iterable, Collector<Tuple2<String, Boolean>> collector) throws Exception {
//code
}
}
我要做的是对于集合中的每个元素,在字段中有任何其他具有相反值的元素。
一个例子:
Iterable: [<val1,val2>,<val3,val4>,<val5,val6>,...,<valx,valy>]
|| || || ||
elem1 elem2 elem3 elemn
我想测试一下:
foreach(element)
if elem(i).f0 = elem(i+1).f0 then ...
if elem(i).f0 = elem(i+2).f0 then ...
<...>
if elem(i+1).f0 = elem(i+2).f0 then ...
<...>
if elem(n-1).f0 = elem(n).f0 then ...
我想这可以使用这样的东西:
Tuple2<String, Boolean> tupla = iterable.iterator().next();
iterable.iterator().forEachRemaining((e)->{
if ((e.f0 == tupla.f0) && (e.f1 != tupla.f1)) collector.collect(e);});
但就像我是Java新手一样,我不知道如何以最佳方式实现它。
这是使用Apache Flink的Java程序的一部分:
.keyBy(0, 1)
.timeWindow(Time.seconds(60))
.apply(new GenerateMetaAlert())
测试:
使用以下代码:
public static class GenerateMetaAlert implements WindowFunction<Tuple2<String, Boolean>, Tuple2<String, Boolean>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, Boolean>> iterable, Collector<Tuple2<String, Boolean>> collector) throws Exception {
System.out.println("key: " +key);
StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.groupingBy(t -> t.f0)) // yields a Map<String, List<Tuple2<String, Boolean>>>
.values() // yields a Collection<List<Tuple2<String, Boolean>>>
.stream()
.forEach(l -> {
System.out.println("l.size: " +l.size());
// l is the list of tuples for some common f0
while (l.size() > 1) {
Tuple2<String, Boolean> t0 = l.get(0);
System.out.println("t0: " +t0);
l = l.subList(1, l.size());
l.stream()
.filter(t -> t.f1 != t0.f1)
.forEach(t -> System.out.println("t: "+ t));
}
});
}
}
结果是:
key: (868789022645948,true)
key: (868789022645948,false)
l.size: 2
l.size: 2
t0: (868789022645948,true)
t0: (868789022645948,false)
这个测试的结论:就像.filter(t -> t.f1 != t0.f1)
的条件从未得到满足
如果我为.filter(t -> t.f1 != t0.f1)
(或false)更改.filter(t -> t.f1 != true)
,则过滤器可以正常工作
我还使用以下内容:
final Boolean[] aux = new Boolean[1];
<...>
Tuple2<String, Boolean> t0 = l.get(0);
aux[0] = t0.f1;
<...>
.filter(t -> !t.f1.equals(aux[0]))
但即便如此,我也没有任何输出(当我使用t.f1.equals(aux[0])
时我只有它
Iterable
允许你在你的元素上获得尽可能多的Iterator
s,但是每个元素都遍历所有元素,只有一次。因此,您使用forEachRemaining()
的想法将无法正常工作。因为你正在生成一个新的Iterator
来调用该方法,所以它将从头开始,而不是在其他迭代器最近提供的元素之后。
你可以做的是通过使用Stream
的Iterable
创建一个Spliterator
,并使用Collector
分组将迭代的元组按其第一个值分组。然后,您可以根据需要处理元组列表。
例如,虽然我怀疑它是否是您真正想要的,但这实现了问题中描述的逻辑:
StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.groupingBy(t -> t.f0)) // yields a Map<String, List<Tuple2<String, Boolean>>>
.values() // yields a Collection<List<Tuple2<String, Boolean>>>
.stream()
.forEach(l -> {
// l is the list of tuples for some common f0
while (l.size() > 1) {
Tuple2<String, Boolean> t0 = l.get(0);
l = l.subList(1, l.size());
l.stream()
.filter(t -> t.f1 != t0.f1)
.forEach(t -> collect(t));
}
});
请注意,可以多次收集相同的元组,如下所示,从您的伪代码。如果你想要一些不同的东西,例如只收集代表给定f1
的f0
值的元组的元组,那么你需要在外部forEach()
操作中使用不同的lambda实现。