Reputation: 369
So I have been playing around with Multiprocessing and I was thinking to upgrade my knowledge where I can read the first sentence from the text file for process 1 then the second sentence for process 2 etc...
txt file:
[email protected]
[email protected]
[email protected]
[email protected]
[email protected]
and this is how the code is looking:
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit():
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
sys.exit()
def main():
user_input = 0
while True:
try:
user_input = int(input(Fore.WHITE + 'How many tasks do you wanna run? [NUMBERS] \n' + Fore.RESET))
except ValueError:
print(Fore.RED + "Stop being stupid" + Fore.RESET)
continue
else:
with open('email.txt') as f:
content = f.readlines()
content = [x.strip('\n') for x in content]
try:
for i, email in enumerate(content):
print(email)
except ValueError as e:
print(e)
HowManyThread = user_input
i = 0
jobs = []
for i in range(HowManyThread):
p = multiprocessing.Process(target=info, args=(str(i),))
jobs.append(p)
time.sleep(.5)
p.start()
for p in jobs:
p.join()
sys.exit()
Log is just a log message basically, Nothing special
However, I have completely no idea what I should do to actually make each process take each email row. So basically....
Process-1 to take [email protected]
Process-2 to take [email protected]
Process-3 to take [email protected]
Process-4 to take [email protected]
Process-5 to take [email protected]
What are the suggestions on how I can do this? I'm completely off and have absolutely no idea on how to move forward.
Update
from multiprocessing import pool, Process, Queue
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
global email_list
email_list = [x.strip('\n') for x in content]
def info(thread):
global prod
prod = int(thread) + 1
runit()
def runit(email_index):
email = email_list[email_index]
log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
sys.exit()
def main():
wipe()
text()
Infotext = "First name : Last name : Email: : Random char + Street"
with open('data.json', 'w') as f:
json.dump(Infotext, f)
f.write("\n")
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(, range(len(email_list)), chunksize=5), total=len(email_list))))
if __name__ == '__main__':
try:
main()
except Exception as e:
print(e)
print(traceback.print_exc())
print(traceback)
Upvotes: 0
Views: 67
Reputation: 1103
The following approach delegates the multiprocessing to a pool of workers, each of which receives a chunk of indices and processes these indices a single line at a time (the choice of poolsize=8
and chunksize=5
here is arbitrary and can be tuned according to your requirements).
The result of all workers is then collected into a final list. Note that imap_unordered
is only appropriate if you don't care about the order in which the lines are processed (i.e. result_list
does not maintain the original order of content
.
from multiprocessing import Pool
# progress bar to track your multiproc
from tqdm import tqdm
with open('email.txt') as f:
content = f.readlines()
# this list will be accessed by each worker
global email_list
email_list = [x.strip('\n') for x in content]
# define function that worker will apply to each email
# it gets sent an index for the list of emails
# it accesses the email at that index, performs its function and returns
def runit(email_index):
email = email_list[email_index]
# do the stuff you're interested in for a single email
# run the multiprocessing to get your results
# this sends the indexes for the emails out to the workers
# and collects the results of runit into result list
with Pool(8) as pool:
result_list = list(tqdm(pool.imap_unordered(runit,
range(len(email_list)), chunksize=5),
total=len(email_list)))
Upvotes: 1
Reputation: 149165
What you need is a pool of worker processes - even if for you use case, I really wonder whether threads (or multiprocessing.dummy) would not be enough.
A pool starts the asked number of worker processes, and you can submit asynchronous tasks to the pool that will be handled by the first free worked.
A stripped down version of your example (no fancy printing, no unnecessary reading of a sequential file in a list) could be:
import multiprocessing
import time
def runit(prod, email):
print("Profile-" + str(prod) + ' - ' + email)
#From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
# sys.exit() # NEVER CALL EXPLICITELY sys.exit() in a worker process
time.sleep(1) # to add a delay inside each task
def main():
while True:
try:
HowManyThread = int(input(
'How many tasks do you wanna run? [NUMBERS] \n'))
except ValueError:
print("Stop being stupid")
continue
if HowManyThread == 0: break
pool = multiprocessing.Pool(HowManyThread)
with open('email.txt') as f:
for i, email in enumerate(f):
email = email.strip()
# runit will be runned by a worker process
pool.apply_async(runit, (i, email))
pool.close() # no more task to add
pool.join() # wait for last worker to end
if __name__ == "__main__":
main()
Upvotes: 0