juanschwartz
juanschwartz

Reputation: 91

psycopg2 process cursor results with muliple threads or processes

I have a function that queries a large table for the purposes of indexing it... It creates a server-side cursor named "all_accounts".

def get_all_accounts(self):
    cursor = self.get_cursor('all_accounts')
    cursor.execute("SELECT * FROM account_summary LIMIT 20000;")

I then process those 2,000 or so at a time to insert into a NoSQL solution:

def index_docs(self, cursor):
  while True:
    # consume result over a series of iterations
    # with each iteration fetching 2000 records
    record_count = cursor.rowcount
    records = cursor.fetchmany(size=2000)

    if not records:
        break

    for r in records:
        # do stuff

I'd like the index_docs function to be consuming the cursor fetchmany() calls in parallel x10 as my bottleneck is not caused by the target system, but rather the single threaded nature of my script. I have done a few async/worker things in the past, but the psycopg2 cursor seemed like it might be an issue. Thoughts?

Upvotes: 0

Views: 1032

Answers (1)

AKX
AKX

Reputation: 168913

I think you'll be safe if a single process/thread accesses the cursor and dishes out work to multiple worker processes that push to the other database. (At a quick glance, server-side cursors can't be shared between connections, but I could be wrong there.)

That is, something like this. Generally you'd use imap_unordered to iterate over a collection of single items (and use a higher chunksize than the default 1), but I think we can just as well use the batches here...

import multiprocessing

def get_batches(conn):
    cursor = conn.get_cursor('all_accounts')
    cursor.execute("SELECT * FROM account_summary LIMIT 20000;")
    while True:
        records = cursor.fetchmany(size=500)
        if not records:
            break
        yield list(records)


def process_batch(batch):
    # (this function is run in child processes)
    for r in batch:
        # ...
    return "some arbitrary result"


def main():
    conn = connect...()
    with multiprocessing.Pool() as p:
        batch_generator = get_batches(conn)
        for result in p.imap_unordered(process_batch, get_batches):
            print(result)  # doesn't really matter

Upvotes: 2

Related Questions