嗨,我想在 ApacheFlink 中为运算符实现 RoundRobin 实现,在继续之前,我想先说一下,我很清楚这已经在 Flink 中实现了,但这只是我想要的实现之一喜欢,然后修改以创建w-choices(参见https://arxiv.org/pdf/1510.05714.pdf)
这是我正在尝试执行的操作的图形表示,其中我尝试执行的操作是
max
元组对的 <key:String (a letter) ; value: Int>
操作:
目前我必须模仿的是以下代码:
DataStream<Tuple2<String, Integer>> split = operatorAggregateStream
.partitionCustom(new RoundRobin(), value->value.f0 )
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new MaxPartialWindowProcessFunction());
DataStream<Tuple2<String, Integer>> reconciliation = split
.partitionCustom(new SingleCast(), value->value.f0 )
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new MaxPartialWindowProcessFunction());
我首先以循环方式进行分区,然后为 processfunction 应用一个窗口并执行“部分函数应用程序”,然后使用
singlecast()
我将所有内容汇聚到一个分区,但这在查看输出时似乎不起作用。
这是 roundRobin 和 singleCast 的代码
public class SingleCast implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
return 0;
}
}
public class RoundRobin implements Partitioner<String> {
int index;
RoundRobin() {
index = 0;
}
@Override
public int partition(String key, int numPartitions) {
index++;
return Math.abs(this.index % numPartitions) ;
}
}
以及 maxPartialFunciton 的代码:
public class MaxPartialWindowProcessFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
// Define a ReducingState to store the maximum value seen so far in the window
private HashMap<String, Integer> map ;
@Override
public void open(Configuration parameters) throws Exception {
map = new HashMap<>();
}
@Override
public void process(Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
for(Tuple2<String, Integer> value : input){
String key = value.f0;
synchronized (map){
if(!map.containsKey(key)){
map.put(key, value.f1);
}
else if(value.f1 > map.get(key)){
map.put(key, value.f1);
}
}
}
for (String k : map.keySet()) {
collector.collect(Tuple2.of(k, map.get(k)));
}
}
}
当创建一个更基本的实现而无需Windows时,roundRobin和SingleCast可以完美地工作,我有以下内容:
DataStream<Tuple2<String, Integer>> aggregation = operatorAggregateStream
.partitionCustom(new RoundRobin(), value->value.f0 ) //any cast
.process(new MaxPartialFunction());
DataStream<Tuple2<String, Integer>> reconciliation = aggregation
.partitionCustom(new SingleCast(), value->value.f0 )
.process(new MaxPartialFunction());
我的窗口实现的问题是它似乎没有正确分区,并且只创建三个或更少的分区。然后,当它协调时,它无法正确协调,它需要三个不同的分区。当使用flink的打印方法时,我得到这个输出
split:13> (A,100)
split:15> (C,100)
split:14> (B,100)
reconciliation:2> (B,100)
reconciliation:1> (A,100)
reconciliation:3> (C,100)
理想情况下,应该出现的是多个分割分区(如果我没记错的话,通常默认情况下应该有 16 个)和一个协调分区。
请帮助我,我已无计可施
进行部分/未加密的最大值是一个有趣的优化,但很难。主要问题是,为了使无键最大窗口操作符能够与窗口一起正常工作,它需要了解时间(这样它就可以在下游完整最大窗口操作符需要时刷新结果)。
但是您无法访问常规的 Flink 计时器,因为它是一个无密钥的流。所以现在您正在使用自己的计时器服务、检查水印等。这意味着您确实需要在 Flink 功能之上的级别实现一个运算符。这是可能的,但并非微不足道。