在Iterable上找到一个元素

问题描述 投票:0回答:1

我有以下课程:

public static class GenerateMetaAlert implements WindowFunction<Tuple2<String, Boolean>, Tuple2<String, Boolean>, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, Boolean>> iterable, Collector<Tuple2<String, Boolean>> collector) throws Exception {
        //code
    }
}

我要做的是对于集合中的每个元素,在字段中有任何其他具有相反值的元素。


一个例子:

Iterable: [<val1,val2>,<val3,val4>,<val5,val6>,...,<valx,valy>]
               ||           ||          ||          ||
              elem1        elem2       elem3       elemn 

我想测试一下:

foreach(element)
   if elem(i).f0 = elem(i+1).f0 then ...
   if elem(i).f0 = elem(i+2).f0 then ...
   <...>
   if elem(i+1).f0 = elem(i+2).f0 then ...
   <...>
   if elem(n-1).f0 = elem(n).f0 then ...

我想这可以使用这样的东西:

  Tuple2<String, Boolean> tupla = iterable.iterator().next();
  iterable.iterator().forEachRemaining((e)->{
  if ((e.f0 == tupla.f0) && (e.f1 != tupla.f1)) collector.collect(e);});

但就像我是Java新手一样,我不知道如何以最佳方式实现它。


这是使用Apache Flink的Java程序的一部分:

.keyBy(0, 1)
.timeWindow(Time.seconds(60))
.apply(new GenerateMetaAlert())

测试:

使用以下代码:

public static class GenerateMetaAlert implements WindowFunction<Tuple2<String, Boolean>, Tuple2<String, Boolean>, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, Boolean>> iterable, Collector<Tuple2<String, Boolean>> collector) throws Exception {
        System.out.println("key: " +key);
        StreamSupport.stream(iterable.spliterator(), false)
                .collect(Collectors.groupingBy(t -> t.f0)) // yields a Map<String, List<Tuple2<String, Boolean>>>
                .values()                                  // yields a Collection<List<Tuple2<String, Boolean>>>
                .stream()
                .forEach(l -> {
                    System.out.println("l.size: " +l.size());
                    // l is the list of tuples for some common f0
                    while (l.size() > 1) {
                        Tuple2<String, Boolean> t0 = l.get(0);
                        System.out.println("t0: " +t0);
                        l = l.subList(1, l.size());
                        l.stream()
                                .filter(t -> t.f1 != t0.f1)
                                .forEach(t -> System.out.println("t: "+ t));
                    }
                });
    }
}

结果是:

key: (868789022645948,true)
key: (868789022645948,false)
l.size: 2
l.size: 2
t0: (868789022645948,true)
t0: (868789022645948,false)

这个测试的结论:就像.filter(t -> t.f1 != t0.f1)的条件从未得到满足

如果我为.filter(t -> t.f1 != t0.f1)(或false)更改.filter(t -> t.f1 != true),则过滤器可以正常工作

我还使用以下内容:

    final Boolean[] aux = new Boolean[1];
    <...>
    Tuple2<String, Boolean> t0 = l.get(0);
    aux[0] = t0.f1;
    <...>
    .filter(t -> !t.f1.equals(aux[0]))

但即便如此,我也没有任何输出(当我使用t.f1.equals(aux[0])时我只有它

java apache-flink
1个回答
0
投票

Iterable允许你在你的元素上获得尽可能多的Iterators,但是每个元素都遍历所有元素,只有一次。因此,您使用forEachRemaining()的想法将无法正常工作。因为你正在生成一个新的Iterator来调用该方法,所以它将从头开始,而不是在其他迭代器最近提供的元素之后。

你可以做的是通过使用StreamIterable创建一个Spliterator,并使用Collector分组将迭代的元组按其第一个值分组。然后,您可以根据需要处理元组列表。

例如,虽然我怀疑它是否是您真正想要的,但这实现了问题中描述的逻辑:

StreamSupport.stream(iterable.spliterator(), false)
    .collect(Collectors.groupingBy(t -> t.f0)) // yields a Map<String, List<Tuple2<String, Boolean>>>
    .values()                                  // yields a Collection<List<Tuple2<String, Boolean>>>
    .stream()
    .forEach(l -> {
        // l is the list of tuples for some common f0
        while (l.size() > 1) {
            Tuple2<String, Boolean> t0 = l.get(0);
            l = l.subList(1, l.size());
            l.stream()
                    .filter(t -> t.f1 != t0.f1)
                    .forEach(t -> collect(t));
        }
    });

请注意,可以多次收集相同的元组,如下所示,从您的伪代码。如果你想要一些不同的东西,例如只收集代表给定f1f0值的元组的元组,那么你需要在外部forEach()操作中使用不同的lambda实现。

© www.soinside.com 2019 - 2024. All rights reserved.