Chad
Chad

Reputation: 493

Filtering time based data in Pig

I'm using Pig 0.11.1 in local mode for now, loading data from a CSV.

So far, I've been able to load our data set and perform the required calculations on it. The next step is to take some samples from the data and perform the same calculations. To replicate existing processes, we want to grab a data point every fifteen minutes.

This is where the trouble comes in. I can write a filter in Pig that will match if a data point is exactly on a fifteen minute interval, but how would I grab data points that are near the fifteen minute boundary?

I need to look at the fifteen minute mark and grab the record that's there. If there is no record right on that mark (most likely), then I need to grab the next record after the mark.

I think I'll need to write my own Filter UDF, but it seems like the UDF would need to be stateful so that it knows when it's found the first match after the time interval. I haven't been able to find any examples of stateful UDFs, and from what I can tell it's probably a bad idea given that we won't know how data is mapped/reduced when eventually run against Hadoop.

I could do this in a couple of steps, by storing key/timestamp values and writing a Python script that would parse those. I'd really like to keep as much of this process in Pig as possible, though.

Edit: The data at its most basic is like this: {id:long, timestamp:long}. The timestamp is in milliseconds. Each set of data is sorted on timestamp. If record X falls exactly on a 15-minute boundary after the minimum timestamp (start time), grab it. Otherwise, grab the very next record after that 15 minute boundary, whenever that might be. I don't have a good example of what the expected results are because I haven't had time to sort through the data by hand.

Upvotes: 0

Views: 1154

Answers (1)

alexeipab
alexeipab

Reputation: 3619

It might be tricky in MapReduce to satisfy the condition "Otherwise, grab the very next record after that 15 minute boundary, whenever that might be", But if you change it slightly to the "grab the previous record before that 15 minute boundary" than it could be quite easy. The idea is that 15 minutes is 900000 milliseconds, so that we can group the records into the groups which cover 900000 milliseconds, sort them and take the top one. Here is an example of the script from the top of my head:

inpt = LOAD '....' AS (id:long, timestamp:long);
intervals = FOREACH inpt GENERATE id, timestamp, timestamp / 900000 as interval;
grp = GROUP intervals BY interval;
result = FOREACH grp {
    sort = ORDER intervals BY timestamp DESC;
    top = LIMIT ord 1;
    GENERATE FLATTEN(top);
};

Upvotes: 1

Related Questions