DavidChuBuaa
DavidChuBuaa

Reputation: 41

how to join relations in-sequence in hadoop pig?

I have one line data like this:

a\tb1,b2,..,bn\tc1,c2,..,cn

in which n is uncertain. And now, I want transform it to some lines like this:

a\tb1\tc1
a\tb2\tc2
...
a\tbn\tcn

Is it possible by pig latin, or has to use UDF? If using the script:

A = LOAD 'file' AS (a, b, c);
B = FOREACH A GENERATE a, FLATTEN(TOKENIZE(b)), FLATTEN(TOKENIZE(c));
dump B;

I will get the resulr as following:

a\tb1\tc1
a\tb1\tc2
..
a\tb1\tcn
a\tb2\tc1
a\tb2\tc2
..
a\tb2\tcn
..

It isn't the data I wanted. Does anyone have ideas?

Upvotes: 0

Views: 122

Answers (2)

DMulligan
DMulligan

Reputation: 9073

IMO too many people who use Pig are resistant to write UDFs. In your case, the UDF you'd need to do this is fairly simple. Here's sample code (untested)

public class InSequenceJoin extends EvalFunc<DataBag>
{
    public DataBag exec(Tuple input) throws IOException {
        String b = (String) input.get(0);
        String c = (String) input.get(1);
        String[] bArray = b.split(",");
        String[] cArray = c.split(",");
        DataBag bag = BagFactory.getInstance().newDefaultBag();
        for (int i = 0; i < bArray.length && i < cArray.length; i++) {
            Tuple tuple = TupleFactory.getInstance.newTuple(2);
            tuple.set(0, bArray[i]);
            tuple.set(1, cArray[i]);
            bag.add(tuple);
        }
        return bag;
    }
}

define InSequenceJoin mysourcepath.InSequenceJoin();
A = LOAD 'file' AS (a, b, c);
B = FOREACH A GENERATE a, FLATTEN(InSequenceJoin(b,c));
dump B;

You could add validation on if the sizes of the arrays match if you need to in the UDF. You could replace the String split I used in example with whatever you truly require.

Upvotes: 1

SNeumann
SNeumann

Reputation: 1177

I'd try to use datafu's bag UDFs.

Load the data as you've done, then use Enumerate to enumerate the bag elements, then flatten (which gives you the cross join between the bag elements as you've seen) and then you can filter on the indexes added to the bag elements.

See here: https://github.com/linkedin/datafu

Upvotes: 0

Related Questions