Parth Mondal
Parth Mondal

Reputation: 23

Python reading data from CSV in memory and writing it to cassandra in multi threaded

I am bit new in python, My current code download the csv file and import it in cassandra but as a single thread. is there a way to create 5 or 10 threads to split the csv file(rows) and read it in parallel and insert the rows in Cassandra one row per thread? , i am trying to create a equity trading Database to store all tick database thus looking for ways to improve the performance of code and methods. please just ignore me if the question sounds bit silly.

    conn = requests.get(url, stream=True)
    if conn.status_code == 200:
       zfile = zipfile.ZipFile(io.BytesIO(conn.content))
       zfile.extractall()
       with open(csv_file) as csv_d:
       csv_content = csv.reader(csv_d)
       for row in csv_content:
           symbol = row[0]
           stype = row[1]
           openp = row[2]
           highp = row[3]
           lowp = row[4]
           closep = row[5]
           vol = row[8]
           dtime = row[10]
           cassa.main('load', symbol, dtime, stype, openp, highp, lowp, closep, vol)

  csv_d.close()
  os.remove(csv_file)
  logging.info("csv file processed succesfully") 

Thanks & Regards

Upvotes: 0

Views: 1640

Answers (2)

Alex Popescu
Alex Popescu

Reputation: 4002

If you happen to use the DataStax Python driver this will give you an async API besides the sync API. Using the async API you can try out a series of different approaches:

  • batched futures: start a number of async queries in parallel in wait for them to complete; repeat
  • queued futures: add futures to a queue; each time you add a new future to the queue, wait for the oldest one to complete

You can find a couple more ideas on how to approach this in this doc.

Upvotes: 3

Jim Meyer
Jim Meyer

Reputation: 9475

The way I would do this in java (and I think python would be similar) is to use a worker thread pool. You would read the csv file in a single thread as you are doing, but then in the for loop you would dispatch each row to a thread in the thread pool.

The worker threads would do a synchronous insert of their single row and return.

The size of the thread pool controls how many inserts you would have running in parallel. Up to a point, the bigger the worker pool, the faster the import of the whole file will happen (limited by the maximum throughput of the cluster).

Another way is to use a single thread and use the asynchronous mode to do the inserts. In java it's called executeAsync and this sends the CQL statement to Cassandra and returns immediately without blocking so that you get the same effect of lots of inserts running in parallel.

You could also look into using the "COPY ... FROM 'file.csv';" CQL command.

Upvotes: 0

Related Questions