Florian
Florian

Reputation: 25385

Obtain same results when evaluating a Spark SQL query twice with LIMIT

I have recently started using pyspark and I encountered some behavior that I am trying to better understand, and avoid.

Consider the following code:

query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()

query2 = "SELECT * FROM B" + " where Y in " + '('  + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)

s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')

From A, I obtain a sample of 50 records from a range, and store the values of X in X_vals. I then take the same records (where Y in X_vals) from table B.

Later, I write both tables to csv files. In the resulting csv files, the X's in A do not match the Y's in B anymore.

I think this is explainable behavior and is caused by lazy evaluation; the records selected in the collect() statement are not the same records as the ones in the .csv statement. However my understanding of Spark is not yet good enough to explain exactly why this happens.

So; why does this happen, and is there a way to force the query to return the same results twice (without joining the tables)?

Thanks,

Florian

Upvotes: 2

Views: 516

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

The problem is the implementation of LIMIT . It implemented by shuffling records to a single partition (you can find detailed explanation in the excellent answer to Towards limiting the big RDD).

At the same time, Spark follows SQL standard rules - if there is no explicit order, then optimizer can choose arbitrary records.

val df = spark.range(1000)

df.where($"id".between(100, 200)).limit(10).explain
== Physical Plan ==
CollectLimit 10
+- *LocalLimit 10
   +- *Filter ((id#16L >= 100) && (id#16L <= 200))
      +- *Range (0, 1000, step=1, splits=4)

To get deterministic (somewhat, AFAIK ties are resolved nondeterministically) order use orderBy clause, to convert CollectLimit into TakeOrderedAndProject:

df.where($"id".between(100, 200)).orderBy("id").limit(10).explain
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#16L ASC NULLS FIRST], output=[id#16L])
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
   +- *Range (0, 1000, step=1, splits=4)

Upvotes: 3

Related Questions