JayDough
JayDough

Reputation: 91

AttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queue

I have the following code using the scheduler and multiprocessing module:

def computation():
    def function1(q):
        while True:
            daydate = datetime.now()
            number = random.randrange(1, 215)
            print('Sent to function2: ({}, {})'.format(daydate, number))
            q.put((daydate, number))
            time.sleep(2)

    def function2(q):
        while True:
            date, number = q.get()
            print("Recevied values from function1: ({}, {})".format(date, number))
            time.sleep(2)

    if __name__ == "__main__":
        q = Queue()
        a = Process(target=function1, args=(q,))
        a.start()
        b = Process(target=function2, args=(q,))
        b.start()
        a.join()
        b.join()

schedule.every().monday.at("08:45").do(computation)
schedule.every().tuesday.at("08:45").do(computation)

while True:
    schedule.run_pending()
    time.sleep(1)

However while executing the code gives the following error:

AttributeError: Can't pickle local object 'computation.. function1

And:

OSError: [WinError 87] The parameter is incorrect

How does one solve this problem? I've tried to solve this by define a function at the top level of a module as stated in the documents (https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled) however it still gives the same error.

Upvotes: 1

Views: 1508

Answers (1)

Darkonaut
Darkonaut

Reputation: 21664

Nested functions are not functions defined at the top-level so that's why you get the error. You need to relocate the definition of function1 and function2 outside of computation.

How you wrote it, your processes would start immediately instead of on the date you scheduled them to run. That probably does what you intended:

import os
import time
import random
from multiprocessing import Process, Queue
from threading import Thread
from datetime import datetime
import schedule


def function1(q):
    while True:
        daydate = datetime.now()
        number = random.randrange(1, 215)
        fmt = '(pid: {}) Sent to function2: ({}, {})'
        print(fmt.format(os.getpid(), daydate, number))
        q.put((daydate, number))
        time.sleep(2)


def function2(q):
    while True:
        date, number = q.get()
        fmt = "(pid: {}) Received values from function1: ({}, {})"
        print(fmt.format(os.getpid(), date, number))
        # time.sleep(2) no need to sleep here because q.get will block until
        # new items are available


def computation():
    q = Queue()
    a = Process(target=function1, args=(q,))
    a.start()
    b = Process(target=function2, args=(q,))
    b.start()
    a.join()
    b.join()


if __name__ == "__main__":

    # We are spawning new threads as a launching platform for
    # computation. Without it, the next job couldn't start before the last
    # one has finished. If your jobs always end before the next one should 
    # start, you don't need this construct and you can just pass 
    # ...do(computation)
    schedule.every().friday.at("01:02").do(
        Thread(target=computation).start
    )
    schedule.every().friday.at("01:03").do(
        Thread(target=computation).start
    )

    while True:
        schedule.run_pending()
        time.sleep(1)

As it is now, your processes would run forever after started once. If that's not what you want, you have to think about implementing some stop condition.

Upvotes: 1

Related Questions