Jay
Jay

Reputation: 2858

Getting progress update from concurrent.futures

I would like to copy a file from a separate thread or process (whichever is faster), so as not to block the main thread.

I'd also like to get an occasional progress update.

With a "regular" worker thread, I can push the progress stat to a Queue, and check on it from the main (UI) thread.

How would I go about this with concurrent.futures?

Upvotes: 4

Views: 1857

Answers (2)

Booboo
Booboo

Reputation: 44043

Since copying files is I/O bound, lightweight threads is all you should be using rather than going through the overhead of creating processes. You could also create a second thread solely to monitor the progress of the copying, as I have done here, but that is optional.

import os
import queue
import concurrent.futures

CHUNKSIZE=1000000

def file_copyer(in_file, out_file, q):
    with open(in_file, 'rb') as f_in, open(out_file, 'wb') as f_out:
        in_size = f_in.seek(0, os.SEEK_END)
        f_in.seek(0, os.SEEK_SET)
        data_read = 0
        while True:
            data = f_in.read(CHUNKSIZE)
            if data == b'':
                break
            f_out.write(data)
            data_read += len(data)
            percentage = int(round((data_read / in_size) * 100, 0))
            q.put(percentage)

def progress_thread(q):
    while True:
        percentage = q.get()
        print(f'{percentage}% complete.')
        if percentage == 100:
            return

# this version uses a progress bar:
def progress_bar_thread(q):
    import sys
    WIDTH=40
    while True:
        percentage = q.get()
        x = int(percentage/100 * WIDTH)
        sys.stdout.write("Copying [%s%s] %i%%/100%% complete\r" % ("#"*x, "."*(WIDTH-x), percentage))
        sys.stdout.flush()
        if percentage == 100:
            return

def main():
    q = queue.Queue()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        f1 = executor.submit(file_copyer, 'IMG_1413.jpg', 'temp.jpg', q)
        f2 = executor.submit(progress_thread, q)
        #f2 = executor.submit(progress_bar_thread, q)            
    f1.result() # wait for completion
    f2.result() # wait for completion
    executor.shutdown()

main()

The function file_copyer, copies the input file to the output file in discreet chunk sizes and after each write computes what percentage has been completed and writes that value to the queue that has been passed as an argument. These completion percentage values can be read from the queue either by the main thread or, as in this case, by progress_thread, which runs in its own thread. Since progress_thread is doing nothing but monitoring the progress, it can do a blocking call q.get and wait for the next percentage completion value to arrive on the queue. If the main thread is doing the monitoring, it should probably issue the non-blocking q.get_nowait call. When the value of 100 (percent) is retrieved, the progress monitoring thread can return since that implies the copying has been completed.

It should be noted that with only two well-defined threads (or just one if monitoring is done by the main thread), one could just as well forgo using the concurrent.futures module, which is excellent if you need thread pooling, which is not really the case here. One can simply just use the threading.Thread class.

Upvotes: 1

Sheikh Abdul Wahid
Sheikh Abdul Wahid

Reputation: 2773

Alternate Approach

background_runner decorator/annotation for running background threads/tasks.background_pools is used to hold current running threads and their progress. __context holds the progress.

import threading
from collections import defaultdict
import time

background_pools = defaultdict(lambda: {})

def background_runner(_func=None, *, pool_max=1, pool_name='default'):
    def main_wrapper(task): # It is our internal decorator, and (task) is our decorated function.
        pool_size=pool_max
        global background_pools

        # It will return empty array if pool is not found.
        pool = background_pools[pool_name]

        print("Pool name is:",pool_name)
        print("Pool size is:",pool_size)

        def task_wrapper(*args, **kwargs): # It is the replacement or Decorated version of aur (task) or (_func)
            def task_in_thread():
                thread_id = threading.current_thread().ident
                context = {}
                pool[thread_id] = { "thread": threading.current_thread(), "context":context}
                try:
                    return task(*args, **kwargs, __context=context)
                finally:
                    try: 
                        del pool[thread_id]
                    except:
                        pass    

            if len(pool.keys()) < pool_size:
                threading.Thread(target=task_in_thread).start()
                print("Task:'{}' is in process.".format(pool_name))
            else:
                print(f"Only { pool_size } task:{pool_name} can run at a time.")
        return task_wrapper
    if _func is None:
        # decorator is used with named arguments.
        return main_wrapper                
    else:
        # decorator is used without arguments.
        return main_wrapper(_func)

testing background_runner decorator by using time.sleep. __context is used to update progress.

@background_runner(pool_max=3, pool_name='sleep_test')
def sleep_test(__context={}):
    __context['time'] = 0
    for index in range(0, 20):
        time.sleep(2)
        __context['time'] += 2

callings of test method

sleep_test()
time.sleep(10) 
print(background_pools)
sleep_test()
time.sleep(10)
print(background_pools)
time.sleep(10)
sleep_test()
sleep_test()
print(background_pools)
time.sleep(10)
print(background_pools)

Upvotes: 2

Related Questions