Benny
Benny

Reputation: 179

Python 3.9 Watchdog, start process for each event

I have a written some python classes to process files when they are created using pythons watchdog module. Whenever a new file event is triggered, a process should be started to handle the processing of the file. There are incoming files from different sources at the same time, so I would expect multiple processes handling some files "simultaneously".

According to syslog only one process is created, so no multiprocessing. I only see a line like

Sep 21 13:53:02 host1 python3.9[6704] : ...

Having the exactly same PID (in this case 6704) in the log.

Can anybody give me a hint what I am doing wrong here?

Content of fileMonitor.py:

import time
from watchdog.observers.polling import PollingObserver
from watchdog.events import RegexMatchingEventHandler

import converter

##
## \brief Class to handle monitored events
##
class LogFileEventHandler(RegexMatchingEventHandler):

    MONITOR_REGEX = [r'.*\.(gz|txt)$']  # watch out for new "*.gz" or "*.txt"-files only
    IGNORE_REGEX = [r'.*/archive/*']    # ignore events below path "*/archive/*"

    ###
    ### Public methods
    ###

    def __init__(self):
        super().__init__(
            regexes=self.MONITOR_REGEX,
            ignore_regexes=self.IGNORE_REGEX,
            ignore_directories=True,
            case_sensitive=False)
        self.cm = converter.ConverterManager()

    def on_created(self, event):
        self.cm.convertMemory(event.src_path)

    def on_moved(self, event):
        self.cm.convertMemory(event.dest_path)

##
## \brief Class to monitor changes in filesystem
## \note Has to use PollingObserver due to network-filesystem.
##       There is no OS-API supporting notification on network-filesystem changes.
##
class LogFileMonitor:

    ###
    ### Public methods
    ###

    def __init__(self, monitorPath):
        self.monitorPath = monitorPath                     # Path to monitor
        self.handler = LogFileEventHandler()               # Handler for events occurred
        self.observer = PollingObserver()                  # Method for monitoring

    def run(self):
        self._start()                                      # Prepare observer
        try:
            while True:
                time.sleep(1)                              # Suspend for some time
        except KeyboardInterrupt:
            self._stop()                                   # Terminate observer

    ###
    ### Private methods
    ###

    def _start(self):
        self.observer.schedule(                            # prepare observer
            event_handler=self.handler,
            path=self.monitorPath,
            recursive=True,
        )
        self.observer.start()                              # start observer

    def _stop(self):
        self.observer.stop()                               # stop observer
        self.observer.join()                               # wait till observer terminated

Content of converter.py

import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        time.sleep(30) # Do some long taking stuff here
    
##
## \brief Class to manage converter workers
##
class ConverterManager:

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')

    def convertMemory(self, logFile):
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: # create a new process from pool
            executor.submit(self._task(logFile))                                # start worker-process

    ###
    ### Private methods
    ###

    def _task(self, logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()

Solved!

Thank you guys for the hints, they almost solved my problem. Actually I also need to declare the "_task"-method static to make it finally work.

Here is the modified code that works for me:

import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        print(f'Started process for {self.logFile}')
        time.sleep(10) # Do some long taking stuff here
        print(f'Terminated process for {self.logFile}')

##
## \brief Class to manage converter workers
##
class ConverterManager:

    executor = None

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

    def convertMemory(self, logFile):
        self.executor.submit(self._task, logFile)                                # start worker-process

    ###
    ### Private methods
    ###
    @staticmethod
    def _task(logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()


if __name__ == '__main__':
    cm = ConverterManager()

    for i in range(30):
        cm.convertMemory(i)

Upvotes: 0

Views: 783

Answers (1)

Weeble
Weeble

Reputation: 17890

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    executor.submit(self._task(logFile))                               

This with statement is broadly equivalent to:

executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
try:
    executor.submit(self._task(logFile))
finally:
    executor.shutdown()

This is described in the documentation for concurrent.futures.Executor.shutdown. Since each executor has only one task submitted to it, and then it is immediately shut-down (causing the caller to block until it finishes its work) there is no opportunity for tasks to proceed concurrently.

An alternative is to create a single executor shared by all your tasks and to call shutdown on it at the end of your program.


Separate to this, when you do:

executor.submit(self._task(logFile))

...that's equivalent to:

result = self._task(logFile)   # result becomes None
executor.submit(None)

So it's actually doing the work immediately and not even submitting it to the executor.

You probably instead want to do this:

executor.submit(self._task, logFile)

...as described in the concurrent.futures.Executor.submit documentation. You pass it the function you want the subprocess to call, and all the arguments you want it to be called with, but importantly you don't call it yourself.

Upvotes: 1

Related Questions