Reputation: 91
I have a function that queries a large table for the purposes of indexing it... It creates a server-side cursor named "all_accounts".
def get_all_accounts(self):
cursor = self.get_cursor('all_accounts')
cursor.execute("SELECT * FROM account_summary LIMIT 20000;")
I then process those 2,000 or so at a time to insert into a NoSQL solution:
def index_docs(self, cursor):
while True:
# consume result over a series of iterations
# with each iteration fetching 2000 records
record_count = cursor.rowcount
records = cursor.fetchmany(size=2000)
if not records:
break
for r in records:
# do stuff
I'd like the index_docs function to be consuming the cursor fetchmany() calls in parallel x10 as my bottleneck is not caused by the target system, but rather the single threaded nature of my script. I have done a few async/worker things in the past, but the psycopg2 cursor seemed like it might be an issue. Thoughts?
Upvotes: 0
Views: 1032
Reputation: 168913
I think you'll be safe if a single process/thread accesses the cursor and dishes out work to multiple worker processes that push to the other database. (At a quick glance, server-side cursors can't be shared between connections, but I could be wrong there.)
That is, something like this. Generally you'd use imap_unordered
to iterate over a collection of single items (and use a higher chunksize
than the default 1), but I think we can just as well use the batches here...
import multiprocessing
def get_batches(conn):
cursor = conn.get_cursor('all_accounts')
cursor.execute("SELECT * FROM account_summary LIMIT 20000;")
while True:
records = cursor.fetchmany(size=500)
if not records:
break
yield list(records)
def process_batch(batch):
# (this function is run in child processes)
for r in batch:
# ...
return "some arbitrary result"
def main():
conn = connect...()
with multiprocessing.Pool() as p:
batch_generator = get_batches(conn)
for result in p.imap_unordered(process_batch, get_batches):
print(result) # doesn't really matter
Upvotes: 2