Olivia Xu
Olivia Xu

Reputation: 11

API doesn't support batch/bulk operations

I have a CSV file with 1.5 millions records, I need to call API to get the users email_address, unfortunately the API documents shows it doesn't support batch operation. Currently , for 1.5 millions records it will run about 3-4 hours, Is there better way to solve this problem? ps: API rate limit is 300/s.

import time,csv
import threading
import concurrent.futures
from sailthru.sailthru_error import SailthruClientError
from sailthru.sailthru_client import SailthruClient



# Initialise a semaphore for rate limiting
semaphore = threading.Semaphore(300)

def convert_user_email_for_multithread(sailthru_client, user_id):
    with semaphore:
        try:
            response = sailthru_client.api_get("user", {"id": user_id})
            if response.is_ok():
                body = response.get_body()
                # handle body which is of type dictionary
                return user_id, (body['keys']['email'])
            else:
                error = response.get_error()
                print(f"Error for user_id {user_id}: {error.get_message()}")
                print ("Status Code:", response.get_status_code())
                print ("Error Code:", error.get_error_code())
                return user_id, None  # Return None for the email
        except SailthruClientError as e:
            # Handle exceptions
            print(f"Exception for user_id {user_id}: {e}")
            return user_id, None  # Return None for the email

        time.sleep(1/300)


"""
process the each smaller file  return all the output_files 
num_threads 100 works but still need to be tested to get the best performance
"""
def process_records(sailthru_client,input_file, output_file, num_threads=100):
    start_time = time.time()

    rows = []
    with open(input_file, "r",encoding='utf-8') as csv_file:
        csv_reader = csv.DictReader(csv_file)

        for row in csv_reader:
            rows.append(row)

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        future_to_row = {executor.submit(convert_user_email_for_multithread, sailthru_client,row["Profile Id"]): row for row in rows}

        with open(output_file, "w", newline="",encoding = 'utf-8') as output_csv:
            fieldnames = csv_reader.fieldnames + ["email_address"]
            writer = csv.DictWriter(output_csv, fieldnames=fieldnames, quotechar='"', quoting=csv.QUOTE_ALL)
            writer.writeheader()

            for future in concurrent.futures.as_completed(future_to_row):
                row = future_to_row[future]
                user_id, email_address = future.result()
                # row["email_address"] = email_address
                # writer.writerow(row)
                if email_address:  # Only write rows with a valid email address
                    row["email_address"] = email_address
                    writer.writerow(row)
                else:
                    print(f"Couldn't find email for user_id {user_id}. Skipping this row.")

    total_time = time.time() - start_time
    print(f"Total Processing Time: {total_time} seconds")
    







enter image description here

I tried to use multi-thread right now, and I expecting the process of call API to get user information quicker.

Upvotes: 1

Views: 51

Answers (0)

Related Questions