Reputation: 2858
I would like to copy a file from a separate thread or process (whichever is faster), so as not to block the main thread.
I'd also like to get an occasional progress update.
With a "regular" worker thread, I can push the progress stat to a Queue
, and check on it from the main (UI) thread.
How would I go about this with concurrent.futures
?
Upvotes: 4
Views: 1857
Reputation: 44043
Since copying files is I/O bound, lightweight threads is all you should be using rather than going through the overhead of creating processes. You could also create a second thread solely to monitor the progress of the copying, as I have done here, but that is optional.
import os
import queue
import concurrent.futures
CHUNKSIZE=1000000
def file_copyer(in_file, out_file, q):
with open(in_file, 'rb') as f_in, open(out_file, 'wb') as f_out:
in_size = f_in.seek(0, os.SEEK_END)
f_in.seek(0, os.SEEK_SET)
data_read = 0
while True:
data = f_in.read(CHUNKSIZE)
if data == b'':
break
f_out.write(data)
data_read += len(data)
percentage = int(round((data_read / in_size) * 100, 0))
q.put(percentage)
def progress_thread(q):
while True:
percentage = q.get()
print(f'{percentage}% complete.')
if percentage == 100:
return
# this version uses a progress bar:
def progress_bar_thread(q):
import sys
WIDTH=40
while True:
percentage = q.get()
x = int(percentage/100 * WIDTH)
sys.stdout.write("Copying [%s%s] %i%%/100%% complete\r" % ("#"*x, "."*(WIDTH-x), percentage))
sys.stdout.flush()
if percentage == 100:
return
def main():
q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(file_copyer, 'IMG_1413.jpg', 'temp.jpg', q)
f2 = executor.submit(progress_thread, q)
#f2 = executor.submit(progress_bar_thread, q)
f1.result() # wait for completion
f2.result() # wait for completion
executor.shutdown()
main()
The function file_copyer
, copies the input file to the output file in discreet chunk sizes and after each write computes what percentage has been completed and writes that value to the queue that has been passed as an argument. These completion percentage values can be read from the queue either by the main thread or, as in this case, by progress_thread
, which runs in its own thread. Since progress_thread
is doing nothing but monitoring the progress, it can do a blocking call q.get
and wait for the next percentage completion value to arrive on the queue. If the main thread is doing the monitoring, it should probably issue the non-blocking q.get_nowait
call. When the value of 100
(percent) is retrieved, the progress monitoring thread can return since that implies the copying has been completed.
It should be noted that with only two well-defined threads (or just one if monitoring is done by the main thread), one could just as well forgo using the concurrent.futures
module, which is excellent if you need thread pooling, which is not really the case here. One can simply just use the threading.Thread
class.
Upvotes: 1
Reputation: 2773
Alternate Approach
background_runner
decorator/annotation for running background threads/tasks.background_pools
is used to hold current running threads and their progress. __context
holds the progress.
import threading
from collections import defaultdict
import time
background_pools = defaultdict(lambda: {})
def background_runner(_func=None, *, pool_max=1, pool_name='default'):
def main_wrapper(task): # It is our internal decorator, and (task) is our decorated function.
pool_size=pool_max
global background_pools
# It will return empty array if pool is not found.
pool = background_pools[pool_name]
print("Pool name is:",pool_name)
print("Pool size is:",pool_size)
def task_wrapper(*args, **kwargs): # It is the replacement or Decorated version of aur (task) or (_func)
def task_in_thread():
thread_id = threading.current_thread().ident
context = {}
pool[thread_id] = { "thread": threading.current_thread(), "context":context}
try:
return task(*args, **kwargs, __context=context)
finally:
try:
del pool[thread_id]
except:
pass
if len(pool.keys()) < pool_size:
threading.Thread(target=task_in_thread).start()
print("Task:'{}' is in process.".format(pool_name))
else:
print(f"Only { pool_size } task:{pool_name} can run at a time.")
return task_wrapper
if _func is None:
# decorator is used with named arguments.
return main_wrapper
else:
# decorator is used without arguments.
return main_wrapper(_func)
testing background_runner
decorator by using time.sleep
. __context
is used to update progress.
@background_runner(pool_max=3, pool_name='sleep_test')
def sleep_test(__context={}):
__context['time'] = 0
for index in range(0, 20):
time.sleep(2)
__context['time'] += 2
callings of test method
sleep_test()
time.sleep(10)
print(background_pools)
sleep_test()
time.sleep(10)
print(background_pools)
time.sleep(10)
sleep_test()
sleep_test()
print(background_pools)
time.sleep(10)
print(background_pools)
Upvotes: 2