Reputation: 2182
Here is the idea of my code:
I have a large RDD of email data, called email
. About 700 million emails. It looks like this:
[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender']]
There are over 40,000 distinct recipeint and sender email addresses in email
. I have a list of 600 email addresses I am interested in, shown below:
relevant_emails = ['rel_email1','rel_email2','rel_email3',...,'rel_email600']
I want to iterate through my large RDD email
to keep only those emails where both the sender and the recipient fall in the list of relevant_emails
. So, I broadcast the relevant_emails so that each worker node will have a copy: broadcast_emails = sc.broadcast(relevant_emails)
.
Here is the function that I want to apply to each row in email
:
def get_relevant_emails(row):
r_bool = False
s_bool = False
recipients = row[1]
sender = row[2]
if sender[0] in broadcast_emails.value:
s_bool = True
for x in range(0, len(recipients)):
if recipients[x] in broadcast_emails.value:
r_bool = True
break
if (r_bool is True and s_bool is True):
return row
The problem I face is that when I run emails.map(lambda row: get_relevant_emails(row))
(and then follow it up with something that forces it to execute, such as saveAsTextFile()
), it starts to run, then sends this:
WARN: Stage 5 contains a task of very large size (xxxx KB). The maximum recommended task size is 100 KB
Then it stops running. FYI: I am running this in a Spark shell, with 20 executors, 10GB of memory per executor, and 3 cores per executor. email
is of size 76.7 GB in terms of block storage consumption on HDFS, and I've got it in 600 partitions (76.7 GB / 128 MB).
Upvotes: 1
Views: 1208
Reputation: 680
The task size that the warning is referring to is likely due to the number of variables that are assigned within the get_relevant_emails() function. Another way that the task size can exceed the maximum recommended size is by referencing other variables outside the scope of the function.
In any case, I would recommend using the DataFrame API as it makes this operation simpler and it will perform better. It is faster because it can do all of the heavy lifting in Java and avoids marshalling data back and forth from the Python and Java Vms. My team and I moved much of our existing python logic into SparkSQL and DataFrames and saw massive performance improvements.
Here's how it could work for your case:
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import broadcast, expr
sc = SparkContext()
sql_ctx = SQLContext(sc)
email = [
[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender1']],
[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender2']],
[['value1','value2','value3','value4'],['recipient1','recipient4','recipient5'],['sender3']]
]
relevant_addresses = [
["sender2"],
["sender3"],
["recipient3"]
]
email_df = sql_ctx.createDataFrame(email, ["values", "recipients", "sender"])
relevant_df = sql_ctx.createDataFrame(relevant_addresses, ["address"])
broadcasted_relevant = broadcast(relevant_df)
result = email_df.join(
broadcasted_relevant,
on=expr("array_contains(recipients, address) OR array_contains(sender, address)"),
how="leftsemi"
)
result.collect()
The left-semi join here acts like a filter and only selects matching rows from email_df. It's the same kind of join that takes place when you use a "WHERE IN " clause in SQL.
Upvotes: 1