Jiew Meng
Jiew Meng

Reputation: 88397

How to batch up items from a PySpark DataFrame

I have a PySpark data frame and for each (batch of) record(s), I want to call an API. So basically say I have 100000k records, I want to batch up items into groups of say 1000 and call an API. How can I do this with PySpark? Reason for the batching is because the API probably will not accept a huge chunk of data from a Big Data system.

I first thought of LIMIT but that wont be "deterministic". Furthermore it seems like it would be inefficient?

Upvotes: 4

Views: 10171

Answers (3)

Vasudev
Vasudev

Reputation: 1017

If the order is not mandatory then you can use randomSplit() to divide the records in roughly equal number of batches. Reference here

df_count = 575641
batch_size = 15000
num_batches = (df_count + batch_size - 1) // batch_size
offset = 0

ids_set = set()
orig_df = spark.range(1,df_count)
for i in range(0,num_batches):
    batch_df = orig_df.offset(offset).limit(batch_size)
    ids_set = ids_set.union([r[0] for r in batch_df.select('id').collect()])
    print(batch_df.count(), len(ids_set), i)

Upvotes: 1

Kishore
Kishore

Reputation: 5891

df.foreachPartition { ele =>
   ele.grouped(1000).foreach { chunk =>
   postToServer(chunk)
}

Code is in scala, you can check same in python. It will create batches of 1000.

Upvotes: 4

ollik1
ollik1

Reputation: 4540

Using foreachPartition and then something like this how to split an iterable in constant-size chunks to batch the iterables to groups of 1000 is arguably the most efficient way to do it in terms of Spark resource usage.

def handle_iterator(it):
    # batch the iterable and call API
    pass
df.foreachPartition(handle_iterator)

Note: This would make parallel calls to the API from executors and might not be the way to go in practise if e.g. rate-limiting is an issue.

Upvotes: 3

Related Questions