Dametime
Dametime

Reputation: 723

POST requests with JSON data, how to break it up into chunks

I have this script to send many (like 100,000++) records from a table. So I would like to have some error handling, and break up the data payload into smaller chunks and send via POST request. My current script works sometimes, but most times return that heap space error.

import json
import requests
from multiprocessing import Pool

url = "some url"
header = {"Content-Type": "application/json", 'Accept':'application/json'}
api_key="some key"

def send_request(data):
    try:
      res = requests.post(url, headers=header, json={"api_key": api_key, "attributes": data})
      print(res.status_code)
      print(res.json())
    except requests.exceptions.RequestException as e:
      #catastrophic error. bail.
      print(e)

def chunks(data):
    for i in range(0, len(data), 50):
        yield data[i:i + 50]



p = Pool(8)
p.map(send_request, chunks(json.loads(data)))

Upvotes: 4

Views: 2343

Answers (1)

jdaz
jdaz

Reputation: 6063

From the multiprocessing documentation:

map (func, iterable[, chunksize])

...

Note that it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.

(Emphasis added)

And then:

imap (func, iterable[, chunksize])

A lazier version of map().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

How great is that? imap does the chunking for you, so you don't have to worry about making your own function.

So, let's test it!

from multiprocessing import Pool
from datetime import datetime
import sys

testObj = [{ "data": list(range(5000)) }] * 250000

def doFunc(data):
    # arbitrary test function, take square root of all numbers
    for item in data:
        for num in item['data']:
            num ** 0.5

def chunks(data):
    for i in range(0, len(data), 50):
        yield data[i:i + 50]

p = Pool(8)

start =  datetime.now()
mapTest = p.map(doFunc, chunks(testObj))
print("map took ", datetime.now() - start)
print("sizeof map: ", sys.getsizeof(mapTest))

start = datetime.now()
imapTest = p.imap(doFunc, testObj, 50)
print("imap took ", datetime.now() - start)
print("sizeof imap: ", sys.getsizeof(imapTest))

start = datetime.now()
imapOTest = p.imap_unordered(doFunc, testObj, 50)
print("imap_unordered took ", datetime.now() - start)
print("sizeof imap_unordered: ", sys.getsizeof(imapOTest))

Results (average of 50 iterations):

map took  0:00:26.61296 
sizeof map:  40072 
imap took  0:00:00.00106
sizeof imap:  128 
imap_unordered took  0:00:00.00108
sizeof imap_unordered:  128

That would be 26.6 seconds vs. 0.001 seconds. But more importantly for what you're looking for, look at the memory savings! Pretty significant results in this very unscientific test.

You won't see quite the same time savings since a POST request can't be sped up the way a square root calculation can, but hopefully switching to imap will help you significantly, especially on the memory front.

Upvotes: 4

Related Questions