Uri Goren
Uri Goren

Reputation: 13700

Spark join always stuck on the same task, how can I debug?

I am using pyspark to run a join of this sort:

rdd1=sc.textFile(hdfs_dir1).map(lambda row: (getKey1(row),getData1(row)))
rdd2=sc.textFile(hdfs_dir2).map(lambda row: (getKey2(row),getData2(row)))
result=rdd1.join(rdd2).collect()

The job executes the first 300 tasks quite fast (~seconds each), and hangs when reaching task 301/308, even when I let it run for days.

I tried to run the pyspark shell with different configuration (number of workers, memory, cpus, cores, shuffle rates) and the result is always the same.

What can be the cause ? and how can I debug it ?

Upvotes: 5

Views: 5055

Answers (3)

Somum
Somum

Reputation: 2422

Has anyone able to solve this problem? My guess is that the issue is because of shuffling data between executors. I used ,ridiculously, two small datasets ( 10 records ) with no missing key and still the join operation was stuck. I had to eventually kill the instance. The only thing which could help in my case was cache().

If we take above example

rdd1=sc.textFile(hdfs_dir1).map(lambda row: (getKey1(row),getData1(row)))
rdd2=sc.textFile(hdfs_dir2).map(lambda row: (getKey2(row),getData2(row)))
# cache it
rdd1.cache()
rdd2.cache()
# I also tried rdd1.collect() and rdd2.collect() to get data cached
# then try the joins 
result=rdd1.join(rdd2)
# I would get the answer 
result.collect() # it works

I am not able to find why caching works though ( Apparently, it should have worked otherwise too ie without cache() ).

Upvotes: 2

MrChristine
MrChristine

Reputation: 1551

You can narrow down whether this is a problem with the collect() call by calling a count instead to see if it is an issue pulling the results into the driver:

result=rdd1.join(rdd2).count()

If the count works, it might be best to add a sample or limit, then call collect() if you're attempting to view the results.

You can also look at the task in the Spark UI to see if a task has been assigned to a particular executor and use the UI again to look at the executor logs. Within the executors tab, you can take a thread dump of the executor that is handling the task. If you take a few thread dumps and compare them, check to see if there's a thread that's hung.

Look at the driver log4j logs, stdout / stderr logs for any additional errors.

Upvotes: 1

Radu Ionescu
Radu Ionescu

Reputation: 3532

Collect will try to fetch the result of your join in the application driver node and you will run into memory issues.

The join operation will cause a lot of shuffle operation, but you can reduce this by using bloom filters (Bloom filter). You construct a bloom filter for the keys in one partition, broadcast and filter the other partition. After applying this operations you should expect smaller RDDs (if you do not have the exact same keys in both of them) and your join operation should be much faster.

The bloom filter can be collected efficiently since you can combine the bits set by one element with the bits set by another element with OR, which is associative and commutative.

Upvotes: 1

Related Questions