JohnMeek
JohnMeek

Reputation: 69

Pig Changing Schema to required type

I'm a new Pig user.

I have an existing schema which I want to modify. My source data is as follows with 6 columns:

Name        Type    Date        Region    Op    Value
-----------------------------------------------------
john        ab      20130106    D         X     20
john        ab      20130106    D         C     19
jphn        ab      20130106    D         T     8
jphn        ab      20130106    E         C     854
jphn        ab      20130106    E         T     67
jphn        ab      20130106    E         X     98

and so on. Each Op value is always C, T or X.

I basically want to split my data in the following way into 7 columns:

Name        Type    Date        Region    OpX    OpC   OpT
----------------------------------------------------------
john        ab      20130106    D         20     19    8
john        ab      20130106    E         98     854   67

Basically split the Op column into 3 columns: each for one Op value. Each of these columns should contain appropriate value from column Value.

How can I do this in Pig?

Upvotes: 1

Views: 1902

Answers (1)

Lorand Bendig
Lorand Bendig

Reputation: 10650

One way to achieve the desired result:

IN = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray, 
       date:int, region:chararray, op:chararray, value:int);
A = order IN by op asc;
B = group A by (name, type, date, region);
C = foreach B {
  bs = STRSPLIT(BagToString(A.value, ','),',',3);
  generate flatten(group) as (name, type, date, region), 
    bs.$2 as OpX:chararray, bs.$0 as OpC:chararray, bs.$1 as OpT:chararray;
}

describe C;
C: {name: chararray,type: chararray,date: int,region: chararray,OpX: 
    chararray,OpC: chararray,OpT: chararray}

dump C;
(john,ab,20130106,D,20,19,8)
(john,ab,20130106,E,98,854,67)

Update:

If you want to skip order by which adds an additional reduce phase to the computation, you can prefix each value with its corresponding op in tuple v. Then sort the tuple fields by using a custom UDF to have the desired OpX, OpC, OpT order:

register 'myjar.jar';
A = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray, 
      date:int, region:chararray, op:chararray, value:int);
B = group A by (name, type, date, region);
C = foreach B {
  v = foreach A generate CONCAT(op, (chararray)value);
  bs = STRSPLIT(BagToString(v, ','),',',3);
  generate flatten(group) as (name, type, date, region), 
    flatten(TupleArrange(bs)) as (OpX:chararray, OpC:chararray, OpT:chararray);
}

where TupleArrange in mjar.jar is something like this:

..
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class TupleArrange extends EvalFunc<Tuple> {

    private static final TupleFactory tupleFactory = TupleFactory.getInstance();

    @Override
    public Tuple exec(Tuple input) throws IOException {
        try {
            Tuple result = tupleFactory.newTuple(3);
            Tuple inputTuple = (Tuple) input.get(0);
            String[] tupleArr = new String[] { 
                    (String) inputTuple.get(0),
                    (String) inputTuple.get(1), 
                    (String) inputTuple.get(2) 
            };  
            Arrays.sort(tupleArr); //ascending
            result.set(0, tupleArr[2].substring(1));
            result.set(1, tupleArr[0].substring(1));
            result.set(2, tupleArr[1].substring(1));
            return result;
        }
        catch (Exception e) {
            throw new RuntimeException("TupleArrange error", e);
        }
    }

    @Override
    public Schema outputSchema(Schema input) {
        return input;
    }
}

Upvotes: 1

Related Questions