Manny
Manny

Reputation: 195

Pig: Get index in nested foreach

I have a pig script with code like :

scores = LOAD 'file' as (id:chararray, scoreid:chararray, score:int);
scoresGrouped = GROUP scores by id;
top10s = foreach scoresGrouped{
    sorted = order scores by score DESC;
    sorted10 = LIMIT sorted 10;
    GENERATE group as id, sorted10.scoreid as top10candidates;
};

It gets me a bag like

 id1, {(scoreidA),(scoreidB),(scoreIdC)..(scoreIdFoo)}

However, I wish to include the index of items as well, so I'd have results like

 id1, {(scoreidA,1),(scoreidB,2),(scoreIdC,3)..(scoreIdFoo,10)}

Is it possible to include the index somehow in the nested foreach, or would I have to write my own UDF to add it in afterwards?

Upvotes: 1

Views: 2755

Answers (2)

Lorand Bendig
Lorand Bendig

Reputation: 10650

For indexing elements in a bag you may use the Enumerate UDF from LinkedIn's DataFu project:

register '/path_to_jar/datafu-0.0.4.jar';
define Enumerate datafu.pig.bags.Enumerate('1');
scores = ...
...
result = foreach top10s generate id, Enumerate(top10candidates);

Upvotes: 2

reo katoa
reo katoa

Reputation: 5801

You'll need a UDF, whose only argument is the sorted bag you want to add a rank to. I've had the same need before. Here's the exec function to save you a little time:

public DataBag exec(Tuple b) throws IOException {
    try {
        DataBag bag = (DataBag) b.get(0);
        Iterator<Tuple> it = bag.iterator();
        while (it.hasNext()) {
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null) {
                t.append(n++);
            }
            newBag.add(t);
        }
    } catch (ExecException ee) {
        throw ee;
    } catch (Exception e) {
        int errCode = 2106;
        String msg = "Error while computing item number in " + this.getClass().getSimpleName();
        throw new ExecException(msg, errCode, PigException.BUG, e);           
    }

    return newBag;
}

(The counter n is initialized as a class variable outside the exec function.)

You can also implement the Accumulator interface, which will allow you to do this even if your entire bag won't fit in memory. (The COUNT built-in function does this.) Just be sure to set n = 1L; in the cleanup() method and return newBag; in getValue(), and everything else is the same.

Upvotes: 2

Related Questions