Apache Flink 中 RoundRobin 分区的实现

问题描述 投票:0回答:1

嗨,我想在 ApacheFlink 中为运算符实现 RoundRobin 实现,在继续之前,我想先说一下,我很清楚这已经在 Flink 中实现了,但这只是我想要的实现之一喜欢,然后修改以创建w-choices(参见https://arxiv.org/pdf/1510.05714.pdf

这是我正在尝试执行的操作的图形表示,其中我尝试执行的操作是

max
元组对的
<key:String (a letter) ; value: Int>
操作:

目前我必须模仿的是以下代码:

DataStream<Tuple2<String, Integer>> split = operatorAggregateStream
           .partitionCustom(new RoundRobin(), value->value.f0 )
           .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
           .process(new MaxPartialWindowProcessFunction());

DataStream<Tuple2<String, Integer>> reconciliation = split
           .partitionCustom(new SingleCast(), value->value.f0 )
           .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
           .process(new MaxPartialWindowProcessFunction());

我首先以循环方式进行分区,然后为 processfunction 应用一个窗口并执行“部分函数应用程序”,然后使用

singlecast()
我将所有内容汇聚到一个分区,但这在查看输出时似乎不起作用。

这是 roundRobin 和 singleCast 的代码

public class SingleCast implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {

        return  0;
    }
}
public class RoundRobin implements Partitioner<String> {

    int index;

    RoundRobin() {
        index = 0;
    }

    @Override
    public int partition(String key, int numPartitions) {
        index++;

        return  Math.abs(this.index % numPartitions) ;
    }
}

以及 maxPartialFunciton 的代码:

public class MaxPartialWindowProcessFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {

    // Define a ReducingState to store the maximum value seen so far in the window

    private HashMap<String, Integer> map ;

    @Override
    public void open(Configuration parameters) throws Exception {
        map = new HashMap<>();
    }
    
    @Override
    public void process(Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
        for(Tuple2<String, Integer> value : input){
            String key = value.f0;
            synchronized (map){
                if(!map.containsKey(key)){
                    map.put(key, value.f1);
                }
                else if(value.f1 > map.get(key)){
                    map.put(key, value.f1);
                }
            }
        }

        for (String k : map.keySet()) {
            collector.collect(Tuple2.of(k, map.get(k)));
        }
    }
}

当创建一个更基本的实现而无需Windows时,roundRobin和SingleCast可以完美地工作,我有以下内容:

DataStream<Tuple2<String, Integer>> aggregation = operatorAggregateStream
           .partitionCustom(new RoundRobin(), value->value.f0 ) //any cast
           .process(new MaxPartialFunction());

DataStream<Tuple2<String, Integer>> reconciliation = aggregation
           .partitionCustom(new SingleCast(), value->value.f0 )
           .process(new MaxPartialFunction());

我的窗口实现的问题是它似乎没有正确分区,并且只创建三个或更少的分区。然后,当它协调时,它无法正确协调,它需要三个不同的分区。当使用flink的打印方法时,我得到这个输出

split:13> (A,100)
split:15> (C,100)
split:14> (B,100)
reconciliation:2> (B,100)
reconciliation:1> (A,100)
reconciliation:3> (C,100)

理想情况下,应该出现的是多个分割分区(如果我没记错的话,通常默认情况下应该有 16 个)和一个协调分区。

请帮助我,我已无计可施

apache-flink flink-streaming data-stream
1个回答
0
投票

进行部分/未加密的最大值是一个有趣的优化,但很难。主要问题是,为了使无键最大窗口操作符能够与窗口一起正常工作,它需要了解时间(这样它就可以在下游完整最大窗口操作符需要时刷新结果)。

但是您无法访问常规的 Flink 计时器,因为它是一个无密钥的流。所以现在您正在使用自己的计时器服务、检查水印等。这意味着您确实需要在 Flink 功能之上的级别实现一个运算符。这是可能的,但并非微不足道。

© www.soinside.com 2019 - 2024. All rights reserved.