Reputation: 15204
My structure (massively simplified) is depicted below:
import multiprocessing
def creator():
# creates files
return
def relocator():
# moves created files
return
create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()
What I am trying to do is have a bunch of files created by creator
and as soon as they get created have them moved to another directory by relocator
.
The reason I want to use multiprocessing
here is:
creator
to wait for the moving to be finished first because moving takes time I dont want to waste.I want both the creator
and relocator
processes to be serial (one file at a time each) but run in parallel. A "log" of the actions should lool like this:
# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file
Based on what I have read, Queue
is the way to go here.
Strategy: (maybe not the best one?!)
After an file gets created it will be entering the queue and after it has finished being relocated, it will be removed from the queue.
I am however having issues coding it; multiple files being created at the same time (multiple instances of creator
running in parallel) and others...
I would be very grateful for any ideas, hints, explanations, etc
Upvotes: 2
Views: 83
Reputation: 42708
Lets take your idea and split in this features:
Creator should create files (100 for example)
Relocator should move 1 file at a time till there are no more files to move
Creator may end before Relocator so it can also transform himself into a Relocator Both have to know when to finish
So, we have 2 main functionalities:
def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
def relocate(from, to):
# moves created files
shuttil.move(from, to)
Now lets create our processes:
from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()
This way we would have now a creator and a relocator, but, lets say now we want the Creator to start relocating when the creation job is done by it, we can just use relocator, but we would need to push one more "STOP_FLAG"
since we would have 2 processes relocating
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)
Lets say we want now an arbitrary number of relocator processes, we should adapt our code a bit to handle this, we would need the creator
method to be aware of how many flags to notify the other processes when to stop, our resulting code would look like this:
from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()
Then you will have to WAIT for them to finish:
creator_process.join()
for rp in relocators:
rp.join()
You may want to check at the multiprocessing.Queue
documentation
Specially to the get
method (is a blocking call by default)
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available.
Upvotes: 1