Bruce.Isyoung
Bruce.Isyoung

Reputation: 1

Q: how to unnest bags from complicated data structure in PIG

Originally I have structure like this:

+-------+-------+----+----+----+-----+
| time  | type  | s1 | s2 | id | p1  |
+-------+-------+----+----+----+-----+
| 10:30 | send  | a  | b  |  1 | 110 |
| 10:35 | send  | c  | d  |  1 | 120 |
| 10:31 | reply | e  | f  |  3 | 221 |
| 10:33 | reply | a  | c  |  1 | 210 |
| 10:34 | send  | a  | a  |  3 | 113 |
| 10:32 | reply | c  | d  |  3 | 157 |
+-------+-------+----+----+----+-----+

I want to normalize the table:

  1. group the entries by id,
  2. inside each group, find out the oldest send type entry,
  3. replace s1, s2 of other entries with the values from that oldest send type entry

```

+-------+-------+----+----+----+-----+
| time  | type  | s1 | s2 | id | p1  |
+-------+-------+----+----+----+-----+
| 10:30 | send  | a  | b  |  1 | 110 |
| 10:35 | send  | a  | b  |  1 | 120 |
| 10:33 | reply | a  | b  |  1 | 210 |
| 10:31 | reply | a  | a  |  3 | 221 |
| 10:34 | send  | a  | a  |  3 | 113 |
| 10:32 | reply | a  | a  |  3 | 157 |
+-------+-------+----+----+----+-----+

this is how I tried to tackle the problem:

events_groupby_id = GROUP events BY id;
events_normalized = FOREACH events_groupby_id {
   f_reqs = FILTER events BY type matches 'send';
   o_reqs = ORDER events BY time ASC;
   req = LIMIT o_reqs 1;
   GENERATE req, events;
};

I am stuck at here. Because I found that events_normalized became a complicated structure with nested bags and I don't know how to flatten correctly.

events_normalized | req:bag{:tuple()} | events:bag{:tuple()}

From here, what should I do to achieve the data structure that I want? I would really appreciate it if anyone can help me out. Thank you.

Upvotes: 0

Views: 537

Answers (1)

savagedata
savagedata

Reputation: 722

You can unnest the bags in events_normalized using FLATTEN:

events_flattened = FOREACH events_normalized GENERATE 
    FLATTEN(req), 
    FLATTEN(events);

This creates a crossproduct between req and events, but since there is only one tuple in req, you end up with only one record for each of your original entries. The schema for events_flattened is:

req::time | req::type | req::s1 | req::s2 | req::id | req::p1 | events::time | events::type | events::s1 | events::s2 | events::id | events::p1

So now you can refer to the fields you wish to keep, using events for the original entries and req for the replacements from the oldest send type entry:

final = FOREACH events_flattened GENERATE 
    events::time AS time, 
    events::type AS type, 
    req::s1 AS s1, 
    req::s2 AS s2, 
    events::id AS id, 
    events::p1 AS p1;

Upvotes: 1

Related Questions