Patan
Patan

Reputation: 17873

how to improve performance by avoiding flatmap operation in apache spark

I am runnign a set of rules against the my java itemObjects. For each item, I am processing the list of the rules.

Normally I have 1 million items and 100 rules.

Currently running this program in spark is taking 15 mins.

I observed that faltMaptopair is taking more time. I want to improve the performance of this program.

Get the rules
map each item against the list of rules and produce result set
return JavaPairRDD of itemId and List<RuleResult>

Any suggestions of refactor this code to improve performance further

I have written the following code.

public JavaPairRDD<String, List<RuleResult>> validate() {       
        List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect();
        JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation
                .map(x -> getRulesResult(rules, x))
                .flatMapToPair(this::mapToRuleResultById)
                .aggregateByKey(
                        MapperUtil.<RuleResult>newList(),
                        MapperUtil::addToList,
                        MapperUtil::combineLists
                );      
        return resultsPairRDD;
    }

    private List<Tuple2<String, RuleResult>> mapToRuleResultById(List<RuleResult> ruleResults) {
        return ruleResults.stream()
                .map(ruleResult -> new Tuple2<>(ruleResult.getItemId(), ruleResult))
                .collect(toList());
    }

    private List<RuleResult> getRulesResult(List<ExecutableRule<T>> rules, T x) {
        return rules.stream()
                .map(rule -> rule.execute(x)).collect(toList());
    }

    public  RuleResult execute(T t){
    //get the rule result
    }

    public class RuleResult{
        private String itemId;
    }

Upvotes: 0

Views: 613

Answers (1)

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

Maybe I'm misunderstanding something, but I don't see the need for neither the flatMap or the aggregateByKey.

public JavaPairRDD<String, List<RuleResult>> validate() {       
    List<ExecutableRule<T>> rules = ruleWrapper.getRulesList().collect();
    JavaPairRDD<String, List<RuleResult>> resultsPairRDD = itemsForValidation
            .map(x -> new Tuple2<>(x, getRulesResult(rules, x)));
    return resultsPairRDD;
}

Will that not work?

Upvotes: 1

Related Questions