Abhishek Mishra
Abhishek Mishra

Reputation: 21

how to handle skewed data while grouping in pig

I am doing a group by operation in which one reduce task is running very longer. Below is the sample code snippet and the description of the issue,

inp =load 'input' using PigStorage('|') AS(f1,f2,f3,f4,f5);

grp_inp = GROUP inp BY (f1,f2) parallel 300;

Since there is skew in data i.e. too many values for one key, one reducer is running for 4 hours. Rest all reduce tasks gets completed in 1 min or so.

What can I do to fix this issue, any alternative approaches ? Any help would be greatly appreciated. Thanks!

Upvotes: 0

Views: 1235

Answers (1)

sorabh
sorabh

Reputation: 81

You may have to check few things :-

1> Filter out records which have both f1 and f2 value as NULL (if any)

2> Try to use hadoop combiner by implementing algebraic interface if possible :-

https://www.safaribooksonline.com/library/view/programming-pig/9781449317881/ch10s02.html

3> Using Custom partitioner to use another key for distributing data across reducer.

Here is the sample code I used to partition my skewed data after join (same can be used after group also) :-

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;

public class KeyPartitioner extends Partitioner<PigNullableWritable, Writable> {

/**
* Here key contains value of current key used for partitioning and Writable
* value conatins all fields from your tuple. I used my 5th field from tuple to do partitioning as I knew it has evenly distributed value.
**/
@Override
public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
    Tuple valueTuple = (Tuple) ((NullableTuple) value).getValueAsPigType();
    try {
        if (valueTuple.size() > 5) {
            Object hashObj = valueTuple.get(5);
            Integer keyHash = Integer.parseInt(hashObj.toString());
            int partitionNo = Math.abs(keyHash) % numPartitions;
            return partitionNo;
        } else {
            if (valueTuple.size() > 0) {
                return (Math.abs(valueTuple.get(1).hashCode())) % numPartitions;
            }
        }
    } catch (NumberFormatException | ExecException ex) {
        Logger.getLogger(KeyPartitioner.class.getName()).log(Level.SEVERE, null, ex);
    }
    return (Math.abs(key.hashCode())) % numPartitions;
}
}

Upvotes: 1

Related Questions