Reputation: 11
I have a CSV file with 1.5 millions records, I need to call API to get the users email_address
, unfortunately the API documents shows it doesn't support batch operation. Currently , for 1.5 millions records it will run about 3-4 hours, Is there better way to solve this problem? ps: API rate limit is 300/s.
import time,csv
import threading
import concurrent.futures
from sailthru.sailthru_error import SailthruClientError
from sailthru.sailthru_client import SailthruClient
# Initialise a semaphore for rate limiting
semaphore = threading.Semaphore(300)
def convert_user_email_for_multithread(sailthru_client, user_id):
with semaphore:
try:
response = sailthru_client.api_get("user", {"id": user_id})
if response.is_ok():
body = response.get_body()
# handle body which is of type dictionary
return user_id, (body['keys']['email'])
else:
error = response.get_error()
print(f"Error for user_id {user_id}: {error.get_message()}")
print ("Status Code:", response.get_status_code())
print ("Error Code:", error.get_error_code())
return user_id, None # Return None for the email
except SailthruClientError as e:
# Handle exceptions
print(f"Exception for user_id {user_id}: {e}")
return user_id, None # Return None for the email
time.sleep(1/300)
"""
process the each smaller file return all the output_files
num_threads 100 works but still need to be tested to get the best performance
"""
def process_records(sailthru_client,input_file, output_file, num_threads=100):
start_time = time.time()
rows = []
with open(input_file, "r",encoding='utf-8') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
rows.append(row)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_row = {executor.submit(convert_user_email_for_multithread, sailthru_client,row["Profile Id"]): row for row in rows}
with open(output_file, "w", newline="",encoding = 'utf-8') as output_csv:
fieldnames = csv_reader.fieldnames + ["email_address"]
writer = csv.DictWriter(output_csv, fieldnames=fieldnames, quotechar='"', quoting=csv.QUOTE_ALL)
writer.writeheader()
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
user_id, email_address = future.result()
# row["email_address"] = email_address
# writer.writerow(row)
if email_address: # Only write rows with a valid email address
row["email_address"] = email_address
writer.writerow(row)
else:
print(f"Couldn't find email for user_id {user_id}. Skipping this row.")
total_time = time.time() - start_time
print(f"Total Processing Time: {total_time} seconds")
I tried to use multi-thread right now, and I expecting the process of call API to get user information quicker.
Upvotes: 1
Views: 51