Reputation: 723
I have this script to send many (like 100,000++) records from a table. So I would like to have some error handling, and break up the data payload into smaller chunks and send via POST request. My current script works sometimes, but most times return that heap space error.
import json
import requests
from multiprocessing import Pool
url = "some url"
header = {"Content-Type": "application/json", 'Accept':'application/json'}
api_key="some key"
def send_request(data):
try:
res = requests.post(url, headers=header, json={"api_key": api_key, "attributes": data})
print(res.status_code)
print(res.json())
except requests.exceptions.RequestException as e:
#catastrophic error. bail.
print(e)
def chunks(data):
for i in range(0, len(data), 50):
yield data[i:i + 50]
p = Pool(8)
p.map(send_request, chunks(json.loads(data)))
Upvotes: 4
Views: 2343
Reputation: 6063
From the multiprocessing
documentation:
map (func, iterable[, chunksize])
...
Note that it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.
(Emphasis added)
And then:
imap (func, iterable[, chunksize])
A lazier version of map().
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
How great is that? imap
does the chunking for you, so you don't have to worry about making your own function.
So, let's test it!
from multiprocessing import Pool
from datetime import datetime
import sys
testObj = [{ "data": list(range(5000)) }] * 250000
def doFunc(data):
# arbitrary test function, take square root of all numbers
for item in data:
for num in item['data']:
num ** 0.5
def chunks(data):
for i in range(0, len(data), 50):
yield data[i:i + 50]
p = Pool(8)
start = datetime.now()
mapTest = p.map(doFunc, chunks(testObj))
print("map took ", datetime.now() - start)
print("sizeof map: ", sys.getsizeof(mapTest))
start = datetime.now()
imapTest = p.imap(doFunc, testObj, 50)
print("imap took ", datetime.now() - start)
print("sizeof imap: ", sys.getsizeof(imapTest))
start = datetime.now()
imapOTest = p.imap_unordered(doFunc, testObj, 50)
print("imap_unordered took ", datetime.now() - start)
print("sizeof imap_unordered: ", sys.getsizeof(imapOTest))
Results (average of 50 iterations):
map took 0:00:26.61296
sizeof map: 40072
imap took 0:00:00.00106
sizeof imap: 128
imap_unordered took 0:00:00.00108
sizeof imap_unordered: 128
That would be 26.6 seconds vs. 0.001 seconds. But more importantly for what you're looking for, look at the memory savings! Pretty significant results in this very unscientific test.
You won't see quite the same time savings since a POST request can't be sped up the way a square root calculation can, but hopefully switching to imap
will help you significantly, especially on the memory front.
Upvotes: 4