在Flink中处理偏斜数据的其他选项有哪些?

问题描述 投票:0回答:1

我正在研究Flink中的数据偏斜处理以及如何更改低level control of physical partition以便对元组进行均匀处理。我创建了合成偏斜数据源,我的目标是在窗口上处理(聚合)它们。这是complete code

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .rebalance() // or .rescale() .shuffle()
        .keyBy(new StationPlatformKeySelector())
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .setParallelism(4)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

根据Flink仪表板,我看不出.shuffle().rescale().rebalance()之间的太大差异。即使文档说rebalance()转换更适合数据偏斜。

之后我尝试使用.partitionCustom(partitioner, "someKey")。但是,令我惊讶的是,我无法在窗口操作中使用setParallelism(4)。文件说

注意:此操作本质上是非并行的,因为所有元素都必须通过相同的运算符实例。

我不明白为什么。如果我被允许做partitionCustom,为什么我不能在那之后使用并行性?这是complete code

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

谢谢,菲利普

java partitioning flink-streaming skew
1个回答
0
投票

我从FLink用户邮件列表中得到了答案。在keyBy()之后基本上使用rebalance()杀死了rebalance()试图做的所有效果。我发现的第一个(ad-hoc)解决方案是创建一个关注偏斜键的复合键。

public class CompositeSkewedKeyStationPlatform implements Serializable {
    private static final long serialVersionUID = -5960601544505897824L;
    private Integer stationId;
    private Integer platformId;
    private Integer skewParameter;
}

我在使用map之前在keyBy()函数上使用它。

public class StationPlatformSkewedKeyMapper
        extends RichMapFunction<MqttSensor, Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor>> {
    private SkewParameterGenerator skewParameterGenerator;

    public StationPlatformSkewedKeyMapper() {
        this.skewParameterGenerator = new SkewParameterGenerator(10);
    }

    @Override
    public Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor> map(MqttSensor value) throws Exception {
        Integer platformId = value.getKey().f2;
        Integer stationId = value.getKey().f4;
        Integer skewParameter = 0;

        if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) {
            skewParameter = this.skewParameterGenerator.getNextItem();
        }
        CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId,
                skewParameter);
        return Tuple2.of(compositeKey, value);
    }
}

这是我的complete solution

© www.soinside.com 2019 - 2024. All rights reserved.