Reputation: 2168
There are several parts involved in the queries in this question. First there are two schemas:
data = FOREACH source GENERATE
json#'id_str' AS post_id:chararray,
json#'user'#'id_str' AS user_id:chararray,
json#'created_at_str' AS tstamp_str:chararray,
FLATTEN(json#'entities'#'hashtags') AS hashtag:chararray;
stopwords = FOREACH another_source GENERATE
token AS stopword_hashtag:chararray;
stopwords
table contains some duplicates and nulls, so the first thing I did is
stopwords = FILTER stopwords BY stopword_hashtag IS NOT NULL;
stopwords = DISTINCT stopwords;
Then I want to filter out the stopwords
from the data
's hashtags, so I do the join with filter, followed by a projection back to data
:
joined = JOIN data BY hashtag LEFT, stopwords BY stopword_hashtag;
joined = FILTER joined BY stopwords::stopword_hashtag IS NULL;
data = FOREACH joined GENERATE
data::post_id AS post_id:chararray,
data::user_id AS user_id:chararray,
parse_time(data::tstamp_str) AS tstamp:long,
data::hashtag AS hashtag:chararray;
parse_time(char array)
is a Jython
UDF I wrote to convert datetime string into UNIX timestamp long. After all these I will do a group+sort:
user_groups = FOREACH (GROUP data BY user_id) GENERATE
group AS user_id:chararray,
data.(hashtag, tstamp) AS time_series:bag{tuple:(tag:chararray,tstamp:long)};
user_groups = FOREACH user_groups {
sorted = ORDER time_series BY tstamp ASC;
GENERATE user_id, sorted;
}
All of these is in one Pig script. I am having performance issue with running this process on large data. I know it works on small toy examples. For large examples where both data
and stopwords
are long, it takes up too much memory and becomes very slow.
As far as I can understand, I am doing filtering as early as possible, also doing only left-join. Any suggestions for performance optimization?
My apology for the late update. I have tried @Ran Locar approach, which works like a charm!
There was a bug in Pig during the filter step:
output = FILTER data_with_count BY from_stopwords_count==0;
which reported
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
It took me some time to figure out a workaround. I solved it by explicitly casting both from_stopwords_count and 0 to long, that is:
data_with_count = FOREACH data_with_count GENERATE
$0.., (long) from_stopwords_count AS from_stopwords_count:long;
output = FILTER data_with_count BY from_stopwords_count==0L;
I guess I could have casted both to int, which possibly would give improvement in performance, but I feel safer with long given the size of the table I have.
Some statistics of my inputs:
The query runs for 2.3 hours, taking 550GB memory and 180 vcores. Successful completion at the end, which really saved my day!
Upvotes: 1
Views: 172
Reputation: 561
There is a way to avoid the join between stopwords and data altogether. Essentially what you need to do is:
get DATA and STOPWORDS to share the same schema
stopwords = FOREACH another_source GENERATE
'' as post_id,
'' as user_id,
'' as tstamp_str,
token AS hashtag:chararray,
0 as from_data,
1 as from_stopwords;
add the from_data and from_stopwords fields to the schema of data
data = FOREACH data GENERATE
$0..,
1 as from_data,
0 as from_stopwords;
UNION the two relations
data_with_stopwords = UNION data, stopwords;
group by token. In each group you will have rows from both relations.
data_with_stopwords_and_counts = foreach (group data_with_stopwords by hashtag) generate
$0.., SUM(data_with_stopwords.from_data) as from_data_sum, SUM(data_with_stopwords.from_stopwords) as from_stopwords_sum;
each row will now have the group key, all the rows belonging to the group, and two numbers. You want only the rows where from_stopwords_sum == 0, as those rows do not appear in the stop words list (this is why you no longer need the DISTINCT... you don't mind words being in the stop words more than once; you will simply get the from_stopwords_sum >= 1, and disregard that row)
filter by from_stopwords_sum == 0 and flatten the list
data_with_stopwords_and_counts = foreach (filter data_with_stopwords_and_counts by from_stopwords_sum==0) generate
flatten($1);
This approach replaces the JOIN by a GROUP BY. When I tested it on many many GBs of data, it finished in <10mins compared to a few hours using joins. Your rows will have the extra sum fields in the end, but you can remove them afterwards.
Upvotes: 3