joe
joe

Reputation: 73

Parallelize slow api calls via threading

I am running a python script that calls a function which relies on a slow api which in turn calls another function that also relies on the same slow api. I would like to speed it up.

what is the best way to do that? the threading module? If so, please provide examples. One thing i noticed about threading is that u cant seem to retrieve a returned value from a thread... and most of my script is written to print out return values of functions..

Here is the code I am trying to increase the I/O performace on

def get_price_eq(currency, rate):

    if isAlt(currency) == False:
        currency = currency.upper()
        price_eq = 'btc_in_usd*USD_in_'+str(currency)+'*'+str(rate)
        #print price_eq
        return price_eq 
    else:
        currency = currency.lower()
        price_eq = 'poloniex'+ str(currency) + '_close' + '*' + str(rate)
        print(price_eq)
        return price_eq

def get_btcprice_fiat(price_eq):

    query = '/api/equation/'+price_eq
    try:
        conn = api.hmac(hmac_key, hmac_secret)
        btcpricefiat = conn.call('GET', query).json()
    except requests.exceptions.RequestException as e:  # This is the correct syntax
            print(e)
    return float(btcpricefiat['data'])

usdbal = float(bal) * get_btcprice_fiat(get_price_eq('USD', 1))
egpbal = float(bal) * get_btcprice_fiat(get_price_eq('EGP', 1))
rsdbal = float(bal) * get_btcprice_fiat(get_price_eq('RSD', 1))
eurbal = float(bal) * get_btcprice_fiat(get_price_eq('EUR', 1))

As you can see, i call get_btc_price, which calls a slow api from a data vendor and pass in a result of another funtion which makes use of another api call and i do it 4+ times, im looking for ways to increase the performance on this funtionality? Also, one thing i had read was that you cant have return values from threads? Most of my code returns values that i then print to user, how can i work with this?

Upvotes: 1

Views: 1213

Answers (1)

Jeril
Jeril

Reputation: 8531

Python 3 has the facility of Launching parallel tasks. This makes our work easier.

It has for thread pooling and Process pooling.

The following gives an insight:

ThreadPoolExecutor Example

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

For Python 2.7 it will as follows:

import thread
import time

# Define a function for the thread
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )

# Create two threads as follows
try:
   thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print "Error: unable to start thread"

Output:

Thread-1: Thu Jan 22 15:42:17 2009
Thread-1: Thu Jan 22 15:42:19 2009
Thread-2: Thu Jan 22 15:42:19 2009
Thread-1: Thu Jan 22 15:42:21 2009
Thread-2: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:25 2009
Thread-2: Thu Jan 22 15:42:27 2009
Thread-2: Thu Jan 22 15:42:31 2009
Thread-2: Thu Jan 22 15:42:35 2009

So in your case it will as follows for Python 3:

data = ['USD', 'EGP', 'RSD', 'EUR']
def helper_func(price_eq):
    return float(bal) * get_btcprice_fiat(get_price_eq(price_eq))


def main():
    res_dict = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for vals, res in zip(PRIMES, executor.map(helper_func, data)):
            res_dict[vals] = res

if __name__ == '__main__':
    main()

So in your case it will as follows for Python 2.7:

data = ['USD', 'EGP', 'RSD', 'EUR']
final_dict = {}
def helper_func(price_eq):
    final_dict[price_eq] = float(bal) * get_btcprice_fiat(get_price_eq(price_eq))

for val in data:
    try:
        thread.start_new_thread(helper_func, (val))
    except:
        print "Error: unable to start thread for %s" % (val)

Upvotes: 4

Related Questions