PokerIncome.com
PokerIncome.com

Reputation: 1708

How to write this Pig query?

I have a many-to-many mapping table between two collections. Each row in the mapping table represents a possible mapping with a weight score.

mapping(id1, id2, weight)

Query: Generate one to one mapping between id1 and id2. Use lowest weight to remove duplicate mappings. If there is tie, output any arbitrary one.

Example input:

(1, X, 1)
(1, Y, 2)
(2, X, 3)
(2, Y, 1)
(3, Z, 2)

Output

(1, X)
(2, Y)
(3, Z)

1 and 2 are both mapped to X and Y. We pick mapping (1, X) and (2, Y) because they have the lowest weight.

Upvotes: 1

Views: 726

Answers (2)

reo katoa
reo katoa

Reputation: 5801

I will assume that you are only interested in mappings where the weight is the lowest of any mapping involving id1, and also the lowest of any mapping involving id2. For example, if you additionally had the mapping (2, Y, 4), it would not conflict with (1, X, 1). I will exclude such mappings because the weight is smaller than (1, Y, 2) and (2, X, 3), which were disqualified.

My solution proceeds as follows: find the minimum mapping weight for each id1, and then join that into the mapping relation for future reference. Use a nested foreach to go through each id2: use ORDER and LIMIT to select the record with the smallest weight for that id2, and then only keep it if the weight is also the minimum for that id1.

Here is the full script, tested on your input:

mapping = LOAD 'input' AS (id1:chararray, id2:chararray, weight:double);

id1_weights =
    FOREACH (GROUP mapping BY id1)
    GENERATE group AS id1, MIN(mapping.weight) AS id1_min_weight;
mapping_with_id1_mins =
    FOREACH (JOIN mapping BY id1, id1_weights BY id1)
    GENERATE mapping::id1, id2, weight, id1_min_weight;

accepted_mappings =
    FOREACH (GROUP mapping_with_id1_mins BY id2)
    {
        ordered = ORDER mapping_with_id1_mins BY weight;
        selected = LIMIT ordered 1;
        acceptable = FILTER selected BY weight == id1_min_weight;
        GENERATE FLATTEN(acceptable);
    };

DUMP accepted_mappings;

Upvotes: 2

PokerIncome.com
PokerIncome.com

Reputation: 1708

Solved it by using Java UDF. it's not perfect in a sense that it won't maximize the number of one-to-one mappings but it's good enough.

Pig:

d = load 'test' as (fid, iid, priority:double);
g = group d by fid;
o = foreach g generate FLATTEN(com.propeld.pig.DEDUP(d)) as (fid, iid, priority);
store o into 'output';

g2 = group o by iid;
o2 = foreach g2 generate FLATTEN(com.propeld.pig.DEDUP(o)) as (fid, iid, priority);
store o2 into 'output2';

Java UDF:

package com.propeld.pig;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class DEDUP extends EvalFunc<Tuple> implements Algebraic{
    public String getInitial() {return Initial.class.getName();}
    public String getIntermed() {return Intermed.class.getName();}
    public String getFinal() {return Final.class.getName();}
    static public class Initial extends EvalFunc<Tuple> {
        private static TupleFactory tfact = TupleFactory.getInstance();
        public Tuple exec(Tuple input) throws IOException {
            // Initial is called in the map.
            // we just send the tuple down
            try {
                // input is a bag with one tuple containing
                // the column we are trying to operate on
                DataBag bg = (DataBag) input.get(0);
                if (bg.iterator().hasNext()) {
                    Tuple dba = (Tuple) bg.iterator().next();
                    return dba;
                } else {
                    // make sure that we call the object constructor, not the list constructor
                    return tfact.newTuple((Object) null);
                }
            } catch (ExecException e) {
                throw e;
            } catch (Exception e) {
                int errCode = 2106;
                throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
            }
        }
    }
    static public class Intermed extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
            return dedup(input);
        }
    }
    static public class Final extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {return dedup(input);}
    }

    static protected Tuple dedup(Tuple input) throws ExecException, NumberFormatException {
        DataBag values = (DataBag)input.get(0);
        Double min = Double.MAX_VALUE;
        Tuple result = null;
        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
            Tuple t = (Tuple) it.next();

            if ((Double)t.get(2) < min){
                min = (Double)t.get(2);
                result = t;
            }
        }
        return result;
    }

    @Override
    public Tuple exec(Tuple input) throws IOException {
        return dedup(input);
    }
}

Upvotes: 0

Related Questions