hans-t
hans-t

Reputation: 3223

How do I continue iterating without having to wait output to finish?

import sqlite3
conn = sqlite3.connect('output.db')

count = 0
items = []
for item in InfStream: # assume I have an infinite stream
    items.append((item,))
    count += 1
    if count == 10000:
        conn.executemany("INSERT INTO table VALUES (?)", items)
        conn.commit()
        items = []

In this Python code, I have a stream of unknown length called InfStream from an API and I would like to insert the item in the stream to a table in a sqlite database. In this case, I firstly create a list of 10,000 items and then insert into the db using executemany. This will take around 1 hour. However, the code has a problem, when executemany is running, I have to wait around 15 seconds to finish. This is not acceptable in my case because, I need to keep getting the item from the stream, or otherwise it will be disconnected if I delay too long.

I would like the loop continues while executemany is running at the same time. Is it possible to do so?

nb. Input is far slower than the write. 10,000 items from input will take around 1 hour and output is only 15 seconds.

Upvotes: 1

Views: 336

Answers (2)

Abhijit
Abhijit

Reputation: 63737

This is a classic Producer–consumer problem that can best be handled using Queue.

The Producer in this case is your InfStream, and the consumer is everything within your for block.

It would be straight forward to convert your sequential code to a multi-threaded Producer-Consumer Model and using Queue for dispatching data between the threads

Consider your Code

import sqlite3
conn = sqlite3.connect('output.db')

count = 0
items = []
for item in InfStream: # assume I have an infinite stream
    items.append((item,))
    count += 1
    if count == 10000:
        conn.executemany("INSERT INTO table VALUES (?)", items)
        conn.commit()
        items = []

Create a Consumer function, to consume the data

def consumer(q):
    def helper():
        while True:
            items = [(q.get(),) for _ in range(10000)]
            conn.executemany("INSERT INTO table VALUES (?)", items)
            conn.commit()
    return helper

And a Producer Function to produce it until infinitum

def producer():
    q = Queue()
    t = Thread(target=consumer(q))
    t.daemon = True
    t.start()
    for item in InfStream:
        q.put(item)
    q.task_done()

Additional Notes in response to the comments

  1. Theoretically, the queue can scale to infinite size, limited by system resource.
  2. If the consumer cannot keep pace with producer

    Span Multiple Consumer ache the Data in a faster IO device and flush it later to the database. Make the Count configurable and dynamic.

Upvotes: 4

merlin2011
merlin2011

Reputation: 75565

It sounds like executemany is blocked on IO, so threading might actually help here, so I would try that first. In particular, create a separate thread which will simply call executemany on data that the first threads throws onto a shared queue. Then, the first read can keep reading, while the second thread does the executemany. As the other answer pointed out, this is a Producer-Consumer problem.

If that does not solve the problem, switch to multiprocessing.

Note that if your input is flowing in more quickly than you can write in the second thread or process, then neither solution will work, because you will fill up memory faster than you can empty it. In that case, you will have to throttle the input reading rate, regardless.

Upvotes: 0

Related Questions