Rash
Rash

Reputation: 8242

Python Notify when all files have been transferred

I am using "watchdog" api to keep checking changes in a folder in my filesystem. Whatever files changes in that folder, I pass them to a particular function which starts threads for each file I pass them.

But watchdog, or any other filesystem watcher api (in my knowledge), notifies users file by file i.e. as the files come by, they notify the user. But I would like it to notify me a whole bunch of files at a time so that I can pass that list to my function and take use of multi-threading. Currently, when I use "watchdog", it notifies me one file at a time and I am only able to pass that one file to my function. I want to pass it many files at a time to be able to have multithreading.

One solution that comes to my mind is: you see when you copy a bunch of files in a folder, OS shows you a progress bar. If it would be possible for me to be notified when that progress bar is done, then it would be a perfect solution for my question. But I don't know if that is possible.

Also I know that watchdog is a polling API, and an ideal API for watching filesystem would be interrupt driven api like pyinotify. But I didn't find any API which was interrupt driven and also cross platform. iWatch is good, but only for linux, and I want something for all OS. So, if you have suggestions on any other API, please do let me know.

Thanks.

Upvotes: 0

Views: 1258

Answers (1)

unutbu
unutbu

Reputation: 880757

Instead of accumulating filesystem events, you could spawn a pool of worker threads which get tasks from a common queue. The watchdog thread could then put tasks in the queue as filesystem events occur. Done this way, a worker thread can start working as soon as an event occurs.

For example,

import logging
import Queue
import threading
import time
import watchdog.observers as observers
import watchdog.events as events

logger = logging.getLogger(__name__)

SENTINEL = None

class MyEventHandler(events.FileSystemEventHandler):
    def on_any_event(self, event):
        super(MyEventHandler, self).on_any_event(event)            
        queue.put(event)
    def __init__(self, queue):
        self.queue = queue

def process(queue):
    while True:
        event = queue.get()
        logger.info(event)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG,
                        format='[%(asctime)s %(threadName)s] %(message)s',
                        datefmt='%H:%M:%S')

    queue = Queue.Queue()
    num_workers = 4
    pool = [threading.Thread(target=process, args=(queue,)) for i in range(num_workers)]
    for t in pool:
        t.daemon = True
        t.start()

    event_handler = MyEventHandler(queue)
    observer = observers.Observer()
    observer.schedule(
        event_handler,
        path='/tmp/testdir',
        recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

Running

% mkdir /tmp/testdir
% script.py

yields output like

[14:48:31 Thread-1] <FileCreatedEvent: src_path=/tmp/testdir/.#foo>
[14:48:32 Thread-2] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-3] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-4] <FileDeletedEvent: src_path=/tmp/testdir/.#foo>
[14:48:42 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/foo>
[14:48:47 Thread-2] <FileCreatedEvent: src_path=/tmp/testdir/.#bar>
[14:48:49 Thread-4] <FileCreatedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-4] <FileModifiedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/.#bar>
[14:48:54 Thread-2] <FileDeletedEvent: src_path=/tmp/testdir/bar>

Doug Hellman has written an excellent set of tutorials (which has now been edited into a book) which should help you get started:

I didn't actually end up using a multiprocessing Pool or ThreadPool as discussed in the last two links, but you may find them useful anyway.

Upvotes: 2

Related Questions