Shivangi Singh
Shivangi Singh

Reputation: 1139

Parallel Cassandra requests using Python multiprocessing library errors on wait()

I written a piece of code that is multiprocessing. Connecting to cassandra where I am running 32 queries to fetch data. I have tried to parralelize the fetch using the multiprocessing library in python. The code looks like this.

    from cassandra.cluster import Cluster
    cluster = Cluster(['xyz'])
    session = cluster.connect()

    query = session.prepare('SELECT stuff')
    session.default_timeout = 600000
    session.default_fetch_size = 100
    queries = [
        session.execute_async(query, ['2021-10-19'] + [i])
        for i in range(32)
    ]
    pool = mp.Pool(32)
    inter_obj = pool.map_async(compute, queries)
    inter_obj.wait()
    res = inter_obj.get()

    pool.close()
    pool.join()
    final_response = reduce(aggregate, res)
    resp = json.dumps(final_response, sort_keys=True, indent=4).encode("utf-8")
    print("RESPONSE", resp)

On running the program it errors on the wait()

Traceback (most recent call last):
  File "/usr/local/bin/date-run", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/sc_eol/run_stuff.py", line 75, in main
    res = inter_obj.get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 768, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

Upvotes: 1

Views: 682

Answers (1)

Erick Ramirez
Erick Ramirez

Reputation: 16313

The execute_async() returns a ResponseFuture object. You're better off building a list of "futures" with:

futures = []
query = ...
for ... :
    futures.append(session.execute_async(query, ...)

This approach executes the queries concurrently. You can then iterate over the results with:

for future in futures:
    rows = future.result()
    # insert processing here

The call to result() is blocked until the request has returned (a) a result or (b) an error.

For details, see the Cassandra Python driver Getting Started guide. Cheers!

Upvotes: 2

Related Questions