Reputation: 3223
import sqlite3
conn = sqlite3.connect('output.db')
count = 0
items = []
for item in InfStream: # assume I have an infinite stream
items.append((item,))
count += 1
if count == 10000:
conn.executemany("INSERT INTO table VALUES (?)", items)
conn.commit()
items = []
In this Python code, I have a stream of unknown length called InfStream from an API and I would like to insert the item in the stream to a table in a sqlite database. In this case, I firstly create a list of 10,000 items and then insert into the db using executemany. This will take around 1 hour. However, the code has a problem, when executemany is running, I have to wait around 15 seconds to finish. This is not acceptable in my case because, I need to keep getting the item from the stream, or otherwise it will be disconnected if I delay too long.
I would like the loop continues while executemany is running at the same time. Is it possible to do so?
nb. Input is far slower than the write. 10,000 items from input will take around 1 hour and output is only 15 seconds.
Upvotes: 1
Views: 336
Reputation: 63737
This is a classic Producer–consumer problem that can best be handled using Queue.
The Producer in this case is your InfStream, and the consumer is everything within your for block.
It would be straight forward to convert your sequential code to a multi-threaded Producer-Consumer Model and using Queue for dispatching data between the threads
Consider your Code
import sqlite3
conn = sqlite3.connect('output.db')
count = 0
items = []
for item in InfStream: # assume I have an infinite stream
items.append((item,))
count += 1
if count == 10000:
conn.executemany("INSERT INTO table VALUES (?)", items)
conn.commit()
items = []
Create a Consumer function, to consume the data
def consumer(q):
def helper():
while True:
items = [(q.get(),) for _ in range(10000)]
conn.executemany("INSERT INTO table VALUES (?)", items)
conn.commit()
return helper
And a Producer Function to produce it until infinitum
def producer():
q = Queue()
t = Thread(target=consumer(q))
t.daemon = True
t.start()
for item in InfStream:
q.put(item)
q.task_done()
Additional Notes in response to the comments
If the consumer cannot keep pace with producer
Span Multiple Consumer ache the Data in a faster IO device and flush it later to the database. Make the Count configurable and dynamic.
Upvotes: 4
Reputation: 75565
It sounds like executemany
is blocked on IO, so threading
might actually help here, so I would try that first. In particular, create a separate thread which will simply call executemany
on data that the first threads throws onto a shared queue. Then, the first read can keep reading, while the second thread does the executemany
. As the other answer pointed out, this is a Producer-Consumer problem.
If that does not solve the problem, switch to multiprocessing
.
Note that if your input is flowing in more quickly than you can write in the second thread or process, then neither solution will work, because you will fill up memory faster than you can empty it. In that case, you will have to throttle the input reading rate, regardless.
Upvotes: 0