Reputation: 295
I am trying to write a function that will be run as a separate process and will be acquiring data until I tell it to stop on the main process.
Here is how it works:
e = multiprocessing.Event()
def recordLeverPos(e, name): #e = multiprocessing.Event()
posData = []
while True:
datBuffer = str(lever.readCounter()) + '\n'
posData.append(datBuffer)
if e.is_set() == True:
with open('%s_leverPos.tsv' %(name), 'a') as file:
for i in range(len(posData)):
posData[i] = posData[i]
file.write(posData[i])
print 'Done Writing to leverPos file.'
while e.is_set() == True:
sleep(0.01)
posData = []
p = mp.Process(target=recordLeverPos, args = (e, name))
def trialStart():
global e
#While trials is going on, some more code regarding the correct task that must be performed.
e.set() #Indicate to the process that it's time to flush the buffer to the tsv file.
#Depending on conditions, trial may continue to trial stop a function in which the patient must not do anything to the lever, but I still need to record the lever position.
e.clear() #Indicate to process to get out of the while loop and start recording a new leverPos buffer.
trialStop()
def trialStop():
global e
#Patient must not do anything here.... but if he/she does.
if (lever.readCounter > threshold): #Patient moved it.
e.set() #Indicate to thread it's time to flush the buffer again.
e.clear() #Indicate to thread you can start with a new buffer.
#However the problem seems to be that when I call the e.set() in this function the process does not receive the event and does not save the Buffer...
#there's a timer here, if the timer passes I still want to record the position of the lever.
def main():
p.start() #Begin adding leverPos to the buffer.
trialStart()
So I call that function and make it into a process that runs separately from my main function.
p = mp.Process(target=recordLeverPos, args = (e, name))
p.start()
The function recordLeverPos basically records positions of a lever by adding it to a list buffer, when the trial is over I call e.set()
and thus the next part of the function is called which is basically copying the list buffer onto a a tsv
file. Then the process will wait until I call e.clear()
.
Here's the problem: I can't seem to have consistent clearing of the event (it's worth noting that I made p and e global variables such that they can be accessed from other functions that run certain stages of the trials). Whenever I call e.set()
it only seems to work for two of the four different places I call e.set()
.
My question is, why is this happening? Is there a better way to be able to do this in a way that I can call it globally?
Is there a better way to communicate with processes? I tried searching myself, but I couldn't figure out how to use pickable objects, and to be honest, the event function/class seemed more intuitive to me, but I can't seem to be able to use it like I'd expect it to work...
Let me know if you would like a bit more code, I just tried to super simplify my code such that you get the main idea and thus you don't waste time trying to understand how my code works.
Upvotes: 1
Views: 449
Reputation: 15180
Here's some sample code that allows data to trickle in, then when an event is signalled, the data is written to a file. It's freestanding and has no dependencies.
Point to note:
Event
is checked with is_set()
, it's immediately cleared. This lets the parent process set()
it againThat's pretty much it, the original code was nearly complete!
import multiprocessing, sys, time
record_ev = multiprocessing.Event()
name = 'beer'
def recordLeverPos(rec_ev, name): #e = multiprocessing.Event()
posData = []
lever = iter(xrange(999))
while True:
datBuffer = str(lever.next()) + '\n'
posData.append(datBuffer)
print 'posData',posData
if rec_ev.is_set():
rec_ev.clear()
with open('%s_leverPos.tsv' % name, 'a') as dataf:
for dat in posData:
dataf.write(dat)
print 'wrote: {}'.format(posData)
print 'Done Writing to leverPos file.'
posData = []
time.sleep(1)
record_proc = multiprocessing.Process(
target=recordLeverPos, args = (record_ev, name)
)
record_proc.start()
time.sleep(2)
print 'triggering record'
record_ev.set()
time.sleep(2)
print 'triggering record #2'
record_ev.set()
time.sleep(2)
record_proc.terminate()
record_proc.join()
sys.exit(0)
posData ['0\n']
posData ['0\n', '1\n']
triggering record
posData ['0\n', '1\n', '2\n']
wrote: ['0\n', '1\n', '2\n']
Done Writing to leverPos file.
posData ['3\n']
triggering record #2
posData ['3\n', '4\n']
wrote: ['3\n', '4\n']
Done Writing to leverPos file.
posData ['5\n']
Upvotes: 2