Reputation: 25385
I have recently started using pyspark and I encountered some behavior that I am trying to better understand, and avoid.
Consider the following code:
query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()
query2 = "SELECT * FROM B" + " where Y in " + '(' + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)
s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')
From A
, I obtain a sample of 50 records from a range, and store the values of X
in X_vals
. I then take the same records (where Y
in X_vals
) from table B
.
Later, I write both tables to csv
files. In the resulting csv
files, the X
's in A
do not match the Y
's in B
anymore.
I think this is explainable behavior and is caused by lazy evaluation; the records selected in the collect()
statement are not the same records as the ones in the .csv
statement. However my understanding of Spark is not yet good enough to explain exactly why this happens.
So; why does this happen, and is there a way to force the query to return the same results twice (without joining the tables)?
Thanks,
Florian
Upvotes: 2
Views: 516
Reputation: 35229
The problem is the implementation of LIMIT
. It implemented by shuffling records to a single partition (you can find detailed explanation in the excellent answer to Towards limiting the big RDD).
At the same time, Spark follows SQL standard rules - if there is no explicit order, then optimizer can choose arbitrary records.
val df = spark.range(1000)
df.where($"id".between(100, 200)).limit(10).explain
== Physical Plan ==
CollectLimit 10
+- *LocalLimit 10
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)
To get deterministic (somewhat, AFAIK ties are resolved nondeterministically) order use orderBy
clause, to convert CollectLimit
into TakeOrderedAndProject
:
df.where($"id".between(100, 200)).orderBy("id").limit(10).explain
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#16L ASC NULLS FIRST], output=[id#16L])
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
+- *Range (0, 1000, step=1, splits=4)
Upvotes: 3