Reputation: 87
I am using PySpark and have some troubles figuring out why merging two RDDs fails for me if the RDDs contain long strings.
In particular, I have two RDDs with the structure
rdd1 = ([key1, [string11, string12]], [key2, [string21, string22]], ...)
rdd2 = ([key1, [string13, string14, string15]], [key2, [string23, string24, string25]], ...)
where the strings can become quite long (i.e., they are worth a few MB). My final aim is to get a new "merged and flattened" RDD with the content
rdd3 = ([key1, string11, string12, string13, string14, string15], [key2, string21, string22, string23, string24, string25], ...)
For this purpose I use the Python command
rdd3 = sparkContext.union([rdd1, rdd2]).groupByKey() \
.mapValues(lambda x: list(x)).map(lambda x: [x[0]] + list(x[1][0]) + list(x[1][1]))
It seems like a simple task and, indeed, this command works well if the strings are small. However, for very long strings the order of the resulting RDD is suddenly mixed up in a seemingly random way like
rdd3 = ([key1, string14, string15, string12, string13, string11], [key2, string21, string22, string24, string25, string23], ...)
While the union
seems to preserve the order, the mixing must occur somewhere between the groupByKey
and the map
. I think the flattening does not seem to be the problem. But since groupByKey
returns a ResultIterable
, it is hard to figure out the details. To sum it up, I have no idea what's actually going on here. Can anybody give me a hint? I am currently running Spark on a local test client with a few workers, if that should matter.
Upvotes: 0
Views: 58
Reputation: 35229
What is going on here is shuffle. Order of operations during shuffles is nondeterministic. In some cases order can be preserved, but it is not guaranteed, and it is limited to simple cases, like local mode.
Unless you keep additional order information and resort values after each shuffle (very expensive) there is no workaround.
Upvotes: 1