Charchit Agarwal
Charchit Agarwal

Reputation: 3777

Fixing the issue where upon calculating how frequently a function is called during multiprocessing it returns a negative value

I have a function foo() which might be accessed by multiple worker processes concurrently. This function blocks until an output is ready, and then returns it. A sample foo is below:

import random
from time import sleep

def foo():
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))
    
    output = 'result of some logic'
    return output

I had a need to calculate how frequently (rate) this function is called (For example once every 15 seconds). However, I do not want this calculated rate to include the time spent in the actual function (since foo may block for long time). To do this with only 1 worker, I ran this:

import random
import time
from time import sleep

call_rate = {'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0}

def foo():
    global call_rate
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo()

worker(3)

# Output: 1.005s . As expected since worker waits 1s before each call
print('foo called once every {}s'.format(call_rate['rate']))  

Basically, I calculated the total time-differences between consecutive calls, and, after subracting the time spent within the function, divided that by the total number of calls (rate = total_time / total_calls)

But when I run this with multiple workers the output is negative:

import random
import time
from time import sleep
from multiprocessing import Manager, Process


def foo(call_rate):

    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'
    time_waited = time.time() - enter_time

    # Add the time since last function call, and remove time spent inside the function
    call_rate['total_time'] += time.time() - call_rate['last_call'] - time_waited
    call_rate['last_call'] = time.time()
    call_rate['total_calls'] += 1

    # calculate rate
    call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    return output

def worker(num, call_rate):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})
    
    w = []
    
    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate,)))
        w[i].start()
    for i in range(3):
        w[i].join()
        
    # Output: -0.97s 
    print('foo called once every {}s'.format(call_rate['rate'])) 

I can kind-of understand why the output is negative. Because there are now multiple processes, the time difference between each consecutive function call becomes smaller and smaller and subtracting the time spent wihtin the function of one process doesn't make much sense now because the function calls can now be from different processes. So my question is how can I get the output in the second case as approximately 0.3s (since there are 3 workers calling the method concurrently with 1s delay) without knowing the number of workers running?

Disclaimer I have already asked (a quite crude varient of) this question before here. However, before posting this question, I read the meta discussions here and here. The reason I believe this question is not a duplicate of my previous one is because it focuses on a much more smaller, better explained issue rather than then my original question which was much broader and failed to explain itself clearly. My aim at that time was to not only seek an answer for this query, but alternatives in my broader approach itself, which led it to becoming vague and arcane. Unlike previously, I have given reproducible code focused on a single, explicit issue and this question as a whole has more useful applications.

Upvotes: 0

Views: 147

Answers (2)

Charchit Agarwal
Charchit Agarwal

Reputation: 3777

I found a way to do it without asking for the number of workers running:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock

def foo(call_rate, lock):
    # Shift this to the start of the function
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    # Output: 0.354s 
    print('foo called once every {}s'.format(call_rate['rate']))

I will explain why this works. In the original code, the last call time was recorded AFTER the function had blocked. This meant that the time spent in the function need to be subtracted. But, as @Booboo had already pointed out in the comment to their answer, this was problematic because there maybe multiple workers running and we can't just subtract the waiting time EACH worker spends in the function.

A simple workaround to this is to record the last call time at the start of the function, where the time spent within the function has not yet been added. But it still doesn't solve the broader problem because the next time foo() will be called from the worker, it will include the time spent within the function from the last call, leaving us at square one again. But this, and I don't know why I didn't see this before, can be fixed very simply; by adding this line just before the function exits:

call_rate['last_call'] = time.time()

This makes sure that when the function exits, the last call is refreshed such that it seems the worker did not spend any time in the function at all. This approach does not require subtracting anything and thats why it works.

I did a test where I ran this 10 times and calculated some statistics using the code below:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics


def foo(call_rate, lock):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    # Mimic blocking of function
    sleep(2)

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(10):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

This outputs:

Highest is : 0.35980285538567436
Lowest is : 0.3536567423078749
Avergae is : 0.356808172331916

As a 'proof' that the above code does ignore the time spent within the function, you can make the function block for a larger time, say 15s, and the output will still be approximately the same.

Update

The reason why the frequency is not 0.3s when the function blocks for a varying time has to do with when the workers enter and exit foo(). Consider the code below where two workers are run once which execute foo() twice and output call_rate every enter and exit of foo() along with a unique id to identify the worker:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics
import string

def foo(call_rate, lock, id):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
        print("{} entered, call rate {}".format(id, call_rate))
    # Mimic blocking of function
    sleep(1)

    output = 'result of some logic'

    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
        print("{} exited, call rate {}".format(id, call_rate))
    return output


def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def worker(num, call_rate, lock):
    id = id_generator()
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock, id)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(2):
        w.append(Process(target=worker, args=(2, call_rate, lock, )))
        w[i].start()
    for i in range(2):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(1):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

Note that in this code, foo() always blocks for 1s. The rate should be close to 0.5s since there are two workers present. Running this code:

Output #1:

XEC6AU entered, call rate {'rate': 1.1851444244384766, 'total_time': 1.1851444244384766, 'last_call': 1624950732.381014, 'total_calls': 1}
O43FUI entered, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950732.4325447, 'total_calls': 2}
XEC6AU exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4327667, 'total_calls': 2}
O43FUI exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4484024, 'total_calls': 2}
XEC6AU entered, call rate {'rate': 0.7401185035705566, 'total_time': 2.22035551071167, 'last_call': 1624950734.433083, 'total_calls': 3}
O43FUI entered, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950734.4487064, 'total_calls': 4}
XEC6AU exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4333804, 'total_calls': 4}
O43FUI exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4958992, 'total_calls': 4}
Highest is : 0.558994710445404
Lowest is : 0.558994710445404
Avergae is : 0.558994710445404

The rate is 0.5s, which should be expected. Notice how both the workers enter and exit the functions simultaneously. Now after changing the function blocking time from 1s to random.randint(1, 10), this is what I get:

Output #2

NHXAKF entered, call rate {'rate': 1.1722326278686523, 'total_time': 1.1722326278686523, 'last_call': 1624886294.4630196, 'total_calls': 1}
R2DD8H entered, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886294.478649, 'total_calls': 2}
NHXAKF exited, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886300.4648588, 'total_calls': 2}
NHXAKF entered, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886301.465171, 'total_calls': 3}
R2DD8H exited, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886302.4811018, 'total_calls': 3}
R2DD8H entered, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886303.4813821, 'total_calls': 4}
NHXAKF exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886304.4660738, 'total_calls': 4}
R2DD8H exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886307.4826, 'total_calls': 4}
Highest is : 0.7971136569976807
Lowest is : 0.7971136569976807
Avergae is : 0.7971136569976807

The rate, unlike before, is almost 0.8. Moreover, both workers are no longer entering and exiting the function together either. This is ofcourse due to one blocking for a longer time than the other. But because they are no longer in sync, they are waiting for 1s at separate times instead of together inside of the worker() function. You can even see that in the call_rate['total_time']. For Output #1, where the workers are in sync, it is ~2s, while for Output #2 it is ~3s. And hence the difference in rates. So the 0.8s is the true rate of the workers calling foo() in this scenario, not the assumed 0.5s. Multiplying the rate by the number of processes would miss this nuance.

Upvotes: 0

Booboo
Booboo

Reputation: 44203

Update

For good measure you should probably be ensuring that foo is updating the call_rate dictionary under a Lock instance to handle concurrent access now that you are running multiple processes. But the real problem is that value last_call needs to be maintained for each process and cannot be shared among the processes.

This solution uses a managed class, WorkerManager that is able to keep track of all the created processes as long as method init_process is called for each process passing its process id immediately after it's started as in the code below. Then all a worker function has to do is call method update_statistics passing the wait times for each of its requests it is processing. A call to get_statistics will return the statistics.

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock, current_process
from multiprocessing.managers import BaseManager

class WorkerManager:
    def __init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    def init_process(self, pid):
        self._call_times[pid] = time.time()

    def update_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    def get_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

class WorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


def foo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

def worker(worker_manager, num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        processes = [Process(target=worker, args=(worker_manager, 3)) for _ in range(3)]
        for p in processes:
            p.start()
            worker_manager.init_process(p.pid)
        for p in processes:
            p.join()
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

Prints:

foo called once every 0.34751895621970846s

How to use a process pool

And if you want to use a process pool, this is how you might use a pool size of 3 to submit 6 tasks:

import random
import time
from time import sleep
from multiprocessing import Manager, Pool, Lock, current_process
from multiprocessing.managers import BaseManager
from functools import partial

class WorkerManager:
    def __init__(self):
        self._total_calls = 0
        self._total_time = 0.0
        self._rate = 0.0
        self._lock = Lock()
        self._call_times = {}

    def init_process(self, pid):
        self._call_times[pid] = time.time()

    def update_statistics(self, pid, wait_time):
        now = time.time()
        time_elapsed = now - self._call_times[pid]
        execution_time = time_elapsed - wait_time
        self._call_times[pid] = now
        with self._lock:
            self._total_calls += 1
            self._total_time += execution_time
            self._rate = self._total_time / (self._total_calls * len(self._call_times))

    def get_statistics(self):
        return {'rate': self._rate, 'total_time': self._total_time, 'total_calls': self._total_calls}

class WorkerManagerManager(BaseManager):
    pass

WorkerManagerManager.register('WorkerManager', WorkerManager)


def pool_init(worker_manager):
    worker_manager.init_process(current_process().pid)

def foo(worker_manager):
    enter_time = time.time()
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'

    wait_time = time.time() - enter_time

    pid = current_process().pid
    worker_manager.update_statistics(pid, wait_time)

    return output

def worker(worker_manager, num):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(worker_manager)

if __name__ == '__main__':
    with WorkerManagerManager() as m:
        worker_manager = m.WorkerManager()
        pool = Pool(3, initializer=pool_init, initargs=(worker_manager,))
        # run 6 tasks
        pool.map(partial(worker, worker_manager), range(6))
        statistics = worker_manager.get_statistics()
        print('foo called once every {}s'.format(statistics['rate']))

Prints:

foo called once every 0.333592324786716s

Upvotes: 1

Related Questions