如何通过避免apache spark中的flatmap操作来提高性能

问题描述 投票:0回答:1

我正在针对我的java数据对象运行一组规则。对于每个项目,我正在处理规则列表。

通常我有100万件物品和100条规则。

目前在火花中运行此程序需要15分钟。

我观察到faltMaptopair需要更多时间。我想提高这个程序的性能。

Get the rules
map each item against the list of rules and produce result set
return JavaPairRDD of itemId and List<RuleResult>

任何重构此代码以进一步提高性能的建议

我写了以下代码。

public JavaPairRDD<String, List<RuleResult>> validate() {       
        List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect();
        JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation
                .map(x -> getRulesResult(rules, x))
                .flatMapToPair(this::mapToRuleResultById)
                .aggregateByKey(
                        MapperUtil.<RuleResult>newList(),
                        MapperUtil::addToList,
                        MapperUtil::combineLists
                );      
        return resultsPairRDD;
    }

    private List<Tuple2<String, RuleResult>> mapToRuleResultById(List<RuleResult> ruleResults) {
        return ruleResults.stream()
                .map(ruleResult -> new Tuple2<>(ruleResult.getItemId(), ruleResult))
                .collect(toList());
    }

    private List<RuleResult> getRulesResult(List<ExecutableRule<T>> rules, T x) {
        return rules.stream()
                .map(rule -> rule.execute(x)).collect(toList());
    }

    public  RuleResult execute(T t){
    //get the rule result
    }

    public class RuleResult{
        private String itemId;
    }
java apache-spark apache-spark-dataset
1个回答
1
投票

也许我误解了一些东西,但我认为不需要flatMapaggregateByKey

public JavaPairRDD<String, List<RuleResult>> validate() {       
    List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect();
    JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation
            .map(x -> new Tuple2<>(x, getRulesResult(rules, x)));
    return resultsPairRDD;
}

那不行吗?

© www.soinside.com 2019 - 2024. All rights reserved.