Reputation: 433
Does anyone how to do pagination in spark sql query?
I need to use spark sql but don't know how to do pagination.
Tried:
select * from person limit 10, 10
Upvotes: 13
Views: 15176
Reputation: 63
The cleaner, easier and lazier way is always the best: use offset and limit, introduced recently in Spark 3.4.
fullDF = ...
total = fullDF.count()
counter = 0
pageSize = 10
while counter < total:
pageDF = fullDF.offset(counter*pageSize).limit(pageSize)
...
counter += 1
Upvotes: 1
Reputation: 637
It has been 6 years, don't know if it was possible back then.
I would add a sequential id on the answer and search for registers between offset and offset + limit
On pure Spark SQL query it would be something like this, for offset 10 and limit 10
WITH count_person AS (
SELECT *, monotonically_increasing_id() AS count FROM person)
SELECT * FROM count_person WHERE count > 10 AND count < 20
On PySpark it would be very similar
import pyspark.sql.functions as F
offset = 10
limit = 10
df = df.withColumn('_id', F.monotonically_increasing_id())
df = df.where(F.col('_id').between(offset, offset + limit))
It's flexible and fast enough even for a big volume of data.
Upvotes: 7
Reputation: 944
Please find bellow a useful PySpark (Python 3 and Spark 3) class named SparkPaging which abstract the pagination mecanism : https://gitlab.com/enahwe/public/lib/spark/sparkpaging
Here's the usage :
Class for paging dataframes and datasets
- Init example 1:
Approach by specifying a limit.
sp = SparkPaging(initData=df, limit=753)
- Init example 2:
Approach by specifying a number of pages (if there's a rest, the number of pages will be incremented).
sp = SparkPaging(initData=df, pages=6)
- Init example 3:
Approach by specifying a limit.
sp = SparkPaging()
sp.init(initData=df, limit=753)
- Init example 4:
Approach by specifying a number of pages (if there's a rest, the number of pages will be incremented).
sp = SparkPaging()
sp.init(initData=df, pages=6)
- Reset:
sp.reset()
- Iterate example:
print("- Total number of rows = " + str(sp.initDataCount))
print("- Limit = " + str(sp.limit))
print("- Number of pages = " + str(sp.pages))
print("- Number of rows in the last page = " + str(sp.numberOfRowsInLastPage))
while (sp.page < sp.pages-1):
df_page = sp.next()
nbrRows = df_page.count()
print(" Page " + str(sp.page) + '/' + str(sp.pages) + ": Number of rows = " + str(nbrRows))
- Output:
- Total number of rows = 4521
- Limit = 753
- Number of pages = 7
- Number of rows in the last page = 3
Page 0/7: Number of rows = 753
Page 1/7: Number of rows = 753
Page 2/7: Number of rows = 753
Page 3/7: Number of rows = 753
Page 4/7: Number of rows = 753
Page 5/7: Number of rows = 753
Page 6/7: Number of rows = 3
Upvotes: 1
Reputation: 2659
karthik's answer will fail if there are duplicate rows in the dataframe. 'except' will remove all rows in df1 which are in df2 .
val filteredRdd = df.rdd.zipWithIndex().collect { case (r, i) if 10 >= start && i <=20 => r }
val newDf = sqlContext.createDataFrame(filteredRdd, df.schema)
Upvotes: 3
Reputation: 13640
There is no support for offset as of now in spark sql. One of the alternatives you can use for paging is through DataFrames
using except
method.
Example: You want to iterate with a paging limit of 10, you can do the following:
DataFrame df1;
long count = df.count();
int limit = 10;
while(count > 0){
df1 = df.limit(limit);
df1.show(); //will print 10, next 10, etc rows
df = df.except(df1);
count = count - limit;
}
If you want to do say, LIMIT 50, 100
in the first go, you can do the following:
df1 = df.limit(50);
df2 = df.except(df1);
df2.limit(100); //required result
Hope this helps!
Upvotes: 2