ch0l1n3
ch0l1n3

Reputation: 295

Best way to communicate in multiprocessing between processes.

I am trying to write a function that will be run as a separate process and will be acquiring data until I tell it to stop on the main process.

Here is how it works:

e = multiprocessing.Event()

def recordLeverPos(e, name): #e = multiprocessing.Event()
    posData = []
    while True:
        datBuffer = str(lever.readCounter()) + '\n'
        posData.append(datBuffer)
        if e.is_set() == True:
            with open('%s_leverPos.tsv' %(name), 'a') as file:
                for i in range(len(posData)):
                    posData[i] = posData[i]
                    file.write(posData[i])
            print 'Done Writing to leverPos file.'
            while e.is_set() == True:
                sleep(0.01)
            posData = []


p = mp.Process(target=recordLeverPos, args = (e, name))

def trialStart():
    global e

    #While trials is going on, some more code regarding the correct task that must be performed.
    e.set() #Indicate to the process that it's time to flush the buffer to the tsv file.


    #Depending on conditions, trial may continue to trial stop a function in which the patient must not do anything to the lever, but I still need to record the lever position.
    e.clear() #Indicate to process to get out of the while loop and start recording a new leverPos buffer.
    trialStop()

def trialStop():
    global e
    #Patient must not do anything here.... but if he/she does.
    if (lever.readCounter > threshold): #Patient  moved it.
        e.set() #Indicate to thread it's time to flush the buffer again.
        e.clear() #Indicate to thread you can start with a new buffer.

        #However the problem seems to be that when I call the e.set() in this function the process does not receive the event and does not save the Buffer... 

    #there's a timer here, if the timer passes I still want to record the position of the lever.

def main():
    p.start() #Begin adding leverPos to the buffer.
    trialStart()

So I call that function and make it into a process that runs separately from my main function.

p = mp.Process(target=recordLeverPos, args = (e, name))
p.start()

The function recordLeverPos basically records positions of a lever by adding it to a list buffer, when the trial is over I call e.set() and thus the next part of the function is called which is basically copying the list buffer onto a a tsv file. Then the process will wait until I call e.clear(). Here's the problem: I can't seem to have consistent clearing of the event (it's worth noting that I made p and e global variables such that they can be accessed from other functions that run certain stages of the trials). Whenever I call e.set() it only seems to work for two of the four different places I call e.set(). My question is, why is this happening? Is there a better way to be able to do this in a way that I can call it globally?

Is there a better way to communicate with processes? I tried searching myself, but I couldn't figure out how to use pickable objects, and to be honest, the event function/class seemed more intuitive to me, but I can't seem to be able to use it like I'd expect it to work...

Let me know if you would like a bit more code, I just tried to super simplify my code such that you get the main idea and thus you don't waste time trying to understand how my code works.

Upvotes: 1

Views: 449

Answers (1)

johntellsall
johntellsall

Reputation: 15180

Here's some sample code that allows data to trickle in, then when an event is signalled, the data is written to a file. It's freestanding and has no dependencies.

Point to note:

  • after the Event is checked with is_set(), it's immediately cleared. This lets the parent process set() it again

That's pretty much it, the original code was nearly complete!

source

import multiprocessing, sys, time

record_ev = multiprocessing.Event()
name = 'beer'

def recordLeverPos(rec_ev, name): #e = multiprocessing.Event()
    posData = []
    lever = iter(xrange(999))
    while True:
        datBuffer = str(lever.next()) + '\n'
        posData.append(datBuffer)
        print 'posData',posData
        if rec_ev.is_set():
            rec_ev.clear()
            with open('%s_leverPos.tsv' % name, 'a') as dataf:
                for dat in posData:
                    dataf.write(dat)
            print 'wrote: {}'.format(posData)
            print 'Done Writing to leverPos file.'
            posData = []
        time.sleep(1)


record_proc = multiprocessing.Process(
    target=recordLeverPos, args = (record_ev, name)
)
record_proc.start()

time.sleep(2)
print 'triggering record'
record_ev.set()

time.sleep(2)
print 'triggering record #2'
record_ev.set()

time.sleep(2)
record_proc.terminate()
record_proc.join()
sys.exit(0)

output

posData ['0\n']
posData ['0\n', '1\n']
triggering record
posData ['0\n', '1\n', '2\n']
wrote: ['0\n', '1\n', '2\n']
Done Writing to leverPos file.
posData ['3\n']
triggering record #2
posData ['3\n', '4\n']
wrote: ['3\n', '4\n']
Done Writing to leverPos file.
posData ['5\n']

Upvotes: 2

Related Questions