Reputation: 13700
I am using pyspark
to run a join of this sort:
rdd1=sc.textFile(hdfs_dir1).map(lambda row: (getKey1(row),getData1(row)))
rdd2=sc.textFile(hdfs_dir2).map(lambda row: (getKey2(row),getData2(row)))
result=rdd1.join(rdd2).collect()
The job executes the first 300
tasks quite fast (~seconds each), and hangs when reaching task 301/308
, even when I let it run for days.
I tried to run the pyspark
shell with different configuration (number of workers, memory, cpus, cores, shuffle rates) and the result is always the same.
What can be the cause ? and how can I debug it ?
Upvotes: 5
Views: 5055
Reputation: 2422
Has anyone able to solve this problem? My guess is that the issue is because of shuffling data between executors. I used ,ridiculously, two small datasets ( 10 records ) with no missing key and still the join operation was stuck. I had to eventually kill the instance. The only thing which could help in my case was cache().
If we take above example
rdd1=sc.textFile(hdfs_dir1).map(lambda row: (getKey1(row),getData1(row)))
rdd2=sc.textFile(hdfs_dir2).map(lambda row: (getKey2(row),getData2(row)))
# cache it
rdd1.cache()
rdd2.cache()
# I also tried rdd1.collect() and rdd2.collect() to get data cached
# then try the joins
result=rdd1.join(rdd2)
# I would get the answer
result.collect() # it works
I am not able to find why caching works though ( Apparently, it should have worked otherwise too ie without cache() ).
Upvotes: 2
Reputation: 1551
You can narrow down whether this is a problem with the collect() call by calling a count instead to see if it is an issue pulling the results into the driver:
result=rdd1.join(rdd2).count()
If the count works, it might be best to add a sample or limit, then call collect() if you're attempting to view the results.
You can also look at the task in the Spark UI to see if a task has been assigned to a particular executor and use the UI again to look at the executor logs. Within the executors tab, you can take a thread dump of the executor that is handling the task. If you take a few thread dumps and compare them, check to see if there's a thread that's hung.
Look at the driver log4j logs, stdout / stderr logs for any additional errors.
Upvotes: 1
Reputation: 3532
Collect
will try to fetch the result of your join in the application driver node and you will run into memory issues.
The join operation will cause a lot of shuffle operation, but you can reduce this by using bloom filters (Bloom filter). You construct a bloom filter for the keys in one partition, broadcast and filter
the other partition. After applying this operations you should expect smaller RDDs (if you do not have the exact same keys in both of them) and your join
operation should be much faster.
The bloom filter can be collected efficiently since you can combine the bits set by one element with the bits set by another element with OR, which is associative and commutative.
Upvotes: 1