simafengyun
simafengyun

Reputation: 433

how to implement spark sql pagination query

Does anyone how to do pagination in spark sql query?

I need to use spark sql but don't know how to do pagination.

Tried:

select * from person limit 10, 10

Upvotes: 13

Views: 15176

Answers (5)

Barenko
Barenko

Reputation: 63

The cleaner, easier and lazier way is always the best: use offset and limit, introduced recently in Spark 3.4.

fullDF = ...
total = fullDF.count()
counter = 0
pageSize = 10
while counter < total:
   pageDF = fullDF.offset(counter*pageSize).limit(pageSize)
   ...
   counter += 1

Upvotes: 1

Khanis Rok
Khanis Rok

Reputation: 637

It has been 6 years, don't know if it was possible back then.

I would add a sequential id on the answer and search for registers between offset and offset + limit

On pure Spark SQL query it would be something like this, for offset 10 and limit 10

WITH count_person AS (
    SELECT *, monotonically_increasing_id() AS count FROM person)
SELECT * FROM count_person WHERE count > 10 AND count < 20

On PySpark it would be very similar

import pyspark.sql.functions as F

offset = 10
limit = 10
df = df.withColumn('_id', F.monotonically_increasing_id())
df = df.where(F.col('_id').between(offset, offset + limit))

It's flexible and fast enough even for a big volume of data.

Upvotes: 7

prossblad
prossblad

Reputation: 944

Please find bellow a useful PySpark (Python 3 and Spark 3) class named SparkPaging which abstract the pagination mecanism : https://gitlab.com/enahwe/public/lib/spark/sparkpaging

Here's the usage :

SparkPaging

Class for paging dataframes and datasets

Example

- Init example 1:

Approach by specifying a limit.

sp = SparkPaging(initData=df, limit=753)

- Init example 2:

Approach by specifying a number of pages (if there's a rest, the number of pages will be incremented).

sp = SparkPaging(initData=df, pages=6)

- Init example 3:

Approach by specifying a limit.

sp = SparkPaging()
sp.init(initData=df, limit=753)

- Init example 4:

Approach by specifying a number of pages (if there's a rest, the number of pages will be incremented).

sp = SparkPaging()
sp.init(initData=df, pages=6)

- Reset:

sp.reset()

- Iterate example:

print("- Total number of rows = " + str(sp.initDataCount))
print("- Limit = " + str(sp.limit))
print("- Number of pages = " + str(sp.pages))
print("- Number of rows in the last page = " + str(sp.numberOfRowsInLastPage))
while (sp.page < sp.pages-1):
    df_page = sp.next()
    nbrRows = df_page.count()
    print("  Page " + str(sp.page) + '/' + str(sp.pages) + ": Number of rows = " + str(nbrRows))

- Output:

- Total number of rows = 4521
- Limit = 753
- Number of pages = 7
- Number of rows in the last page = 3
    Page 0/7: Number of rows = 753
    Page 1/7: Number of rows = 753
    Page 2/7: Number of rows = 753
    Page 3/7: Number of rows = 753
    Page 4/7: Number of rows = 753
    Page 5/7: Number of rows = 753
    Page 6/7: Number of rows = 3

Upvotes: 1

Himaprasoon
Himaprasoon

Reputation: 2659

karthik's answer will fail if there are duplicate rows in the dataframe. 'except' will remove all rows in df1 which are in df2 .

val filteredRdd = df.rdd.zipWithIndex().collect { case (r, i) if 10 >= start && i <=20 => r }
val newDf = sqlContext.createDataFrame(filteredRdd, df.schema)

Upvotes: 3

karthik manchala
karthik manchala

Reputation: 13640

There is no support for offset as of now in spark sql. One of the alternatives you can use for paging is through DataFrames using except method.

Example: You want to iterate with a paging limit of 10, you can do the following:

    DataFrame df1;
    long count = df.count();
    int limit = 10;
    while(count > 0){
        df1 = df.limit(limit);
        df1.show();            //will print 10, next 10, etc rows
        df = df.except(df1);
        count = count - limit;
    }

If you want to do say, LIMIT 50, 100 in the first go, you can do the following:

        df1 = df.limit(50);
        df2 = df.except(df1);
        df2.limit(100);       //required result

Hope this helps!

Upvotes: 2

Related Questions