Patrick the Cat
Patrick the Cat

Reputation: 2168

Understanding Apache Pig Performance

There are several parts involved in the queries in this question. First there are two schemas:

data = FOREACH source GENERATE
    json#'id_str' AS post_id:chararray,
    json#'user'#'id_str' AS user_id:chararray,
    json#'created_at_str' AS tstamp_str:chararray,
    FLATTEN(json#'entities'#'hashtags') AS hashtag:chararray;

stopwords = FOREACH another_source GENERATE
    token AS stopword_hashtag:chararray;

stopwords table contains some duplicates and nulls, so the first thing I did is

stopwords = FILTER stopwords BY stopword_hashtag IS NOT NULL;
stopwords = DISTINCT stopwords;

Then I want to filter out the stopwords from the data's hashtags, so I do the join with filter, followed by a projection back to data:

joined = JOIN data BY hashtag LEFT, stopwords BY stopword_hashtag;
joined = FILTER joined BY stopwords::stopword_hashtag IS NULL;

data = FOREACH joined GENERATE
    data::post_id AS post_id:chararray,
    data::user_id AS user_id:chararray,
    parse_time(data::tstamp_str) AS tstamp:long,
    data::hashtag AS hashtag:chararray;

parse_time(char array) is a Jython UDF I wrote to convert datetime string into UNIX timestamp long. After all these I will do a group+sort:

user_groups = FOREACH (GROUP data BY user_id) GENERATE
    group AS user_id:chararray,
    data.(hashtag, tstamp) AS time_series:bag{tuple:(tag:chararray,tstamp:long)};
user_groups = FOREACH user_groups {
    sorted = ORDER time_series BY tstamp ASC;
    GENERATE user_id, sorted;
}

All of these is in one Pig script. I am having performance issue with running this process on large data. I know it works on small toy examples. For large examples where both data and stopwords are long, it takes up too much memory and becomes very slow.

As far as I can understand, I am doing filtering as early as possible, also doing only left-join. Any suggestions for performance optimization?


My apology for the late update. I have tried @Ran Locar approach, which works like a charm!

There was a bug in Pig during the filter step:

output = FILTER data_with_count BY from_stopwords_count==0;

which reported

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long

It took me some time to figure out a workaround. I solved it by explicitly casting both from_stopwords_count and 0 to long, that is:

 data_with_count = FOREACH data_with_count GENERATE
     $0.., (long) from_stopwords_count AS from_stopwords_count:long;
 output = FILTER data_with_count BY from_stopwords_count==0L;

I guess I could have casted both to int, which possibly would give improvement in performance, but I feel safer with long given the size of the table I have.

Some statistics of my inputs:

  1. source of data is 1.1TB in gzip format
  2. source of stopwords is 400GB in gzip format, but I am only taking one column of it really.

The query runs for 2.3 hours, taking 550GB memory and 180 vcores. Successful completion at the end, which really saved my day!

Upvotes: 1

Views: 172

Answers (1)

Ran Locar
Ran Locar

Reputation: 561

There is a way to avoid the join between stopwords and data altogether. Essentially what you need to do is:

  • No need for the DISTINCT, or the filtering of stop words.
  • get DATA and STOPWORDS to share the same schema

    stopwords = FOREACH another_source GENERATE
    '' as post_id,
    '' as user_id,
    '' as tstamp_str,
    token AS hashtag:chararray,
    0 as from_data,
    1 as from_stopwords;
    
  • add the from_data and from_stopwords fields to the schema of data

    data = FOREACH data GENERATE
    $0..,
    1 as from_data,
    0 as from_stopwords;
    
  • UNION the two relations

    data_with_stopwords = UNION data, stopwords;
    
  • group by token. In each group you will have rows from both relations.

    data_with_stopwords_and_counts = foreach (group data_with_stopwords by hashtag) generate
    $0.., SUM(data_with_stopwords.from_data) as from_data_sum, SUM(data_with_stopwords.from_stopwords) as from_stopwords_sum;
    

each row will now have the group key, all the rows belonging to the group, and two numbers. You want only the rows where from_stopwords_sum == 0, as those rows do not appear in the stop words list (this is why you no longer need the DISTINCT... you don't mind words being in the stop words more than once; you will simply get the from_stopwords_sum >= 1, and disregard that row)

  • filter by from_stopwords_sum == 0 and flatten the list

    data_with_stopwords_and_counts = foreach (filter data_with_stopwords_and_counts by from_stopwords_sum==0) generate
    flatten($1);
    

This approach replaces the JOIN by a GROUP BY. When I tested it on many many GBs of data, it finished in <10mins compared to a few hours using joins. Your rows will have the extra sum fields in the end, but you can remove them afterwards.

Upvotes: 3

Related Questions