poiuytrez
poiuytrez

Reputation: 22546

Spark join exponentially slow

I am trying to make a join on two Spark RDDs. I have a transaction log which is linked to categories. I have formatted my transaction RDD to have a category id as the key.

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

The transaction log is about 20 GB (350 millions of lines). The category list is less than 1KB.

When I run

transactions_cat.join(categories).count()

Spark begins to be very slow. I have a stage that have 643 tasks. The first 10 tasks took about 1 min. Each tasks is then slower and slower (approximately 15 min around the 60th task). I am not sure what's wrong.

Please check theses screenshots to get a better idea. enter image description here enter image description here enter image description here

I am running Spark 1.1.0 with 4 workers using the python shell for a total Memory of 50 GB. Counting the transactions RDD only is quite fast (30min)

Upvotes: 3

Views: 3963

Answers (1)

Spiro Michaylov
Spiro Michaylov

Reputation: 3571

What's wrong is probably that Spark isn't noticing that you have an easy case of the join problem. When one of the two RDDs you're joining is so small you're better off with it not being an RDD. Then you can roll your own implementation of hash join, which is actually a lot simpler than it sounds. Basically, you need to:

  • Pull your category list out of the RDD using collect() -- the resulting communication will easily pay for itself (or, if possible, don't make it an RDD in the first place)
  • Turn it into a hash table with one entry containing all the values for one key (assuming your keys are not unique)
  • For each pair in your large RDD, look the key up in the hash table and produce one pair for each value in the list (if not found then that particular pair doesn't produce any results)

I have an implementation in Scala -- feel free to ask questions about translating it, but it should be pretty easy.

Another interesting possibility is to try using Spark SQL. I'm pretty sure the project's long term ambitions would include doing this for you automatically, but I don't know if they've achieved it yet.

Upvotes: 7

Related Questions