CDNthe2nd
CDNthe2nd

Reputation: 369

Python - Multirprocessing, give each processor an email from a text file

So I have been playing around with Multiprocessing and I was thinking to upgrade my knowledge where I can read the first sentence from the text file for process 1 then the second sentence for process 2 etc...

txt file:

[email protected]
[email protected]
[email protected]
[email protected]
[email protected]

and this is how the code is looking:

def info(thread):
    global prod
    prod = int(thread) + 1
    runit()


def runit():

    log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
    #From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
    sys.exit()

def main():
    user_input = 0
    while True:
        try:
            user_input = int(input(Fore.WHITE + 'How many tasks do you wanna run? [NUMBERS] \n' + Fore.RESET))
        except ValueError:
            print(Fore.RED + "Stop being stupid" + Fore.RESET)
            continue
        else:
            with open('email.txt') as f:
                content = f.readlines()
            content = [x.strip('\n') for x in content]

            try:
                for i, email in enumerate(content):
                    print(email)

            except ValueError as e:
                print(e)

            HowManyThread = user_input
            i = 0
            jobs = []
            for i in range(HowManyThread):
                p = multiprocessing.Process(target=info, args=(str(i),))
                jobs.append(p)
                time.sleep(.5)
                p.start()

            for p in jobs:
                p.join()

            sys.exit()

Log is just a log message basically, Nothing special

Fore.COLOR <-- Colorama

However, I have completely no idea what I should do to actually make each process take each email row. So basically....

Process-1 to take [email protected]
Process-2 to take [email protected]
Process-3 to take [email protected]
Process-4 to take [email protected]
Process-5 to take [email protected]

What are the suggestions on how I can do this? I'm completely off and have absolutely no idea on how to move forward.


Update

from multiprocessing import pool, Process, Queue
from tqdm import tqdm

with open('email.txt') as f:
    content = f.readlines()

global email_list
email_list = [x.strip('\n') for x in content]


def info(thread):
    global prod
    prod = int(thread) + 1
    runit()


def runit(email_index):
    email = email_list[email_index]

    log("Profile-" + str(prod) + Fore.GREEN + ' - ' + email)
    sys.exit()



def main():
    wipe()
    text()
    Infotext = "First name : Last name : Email: : Random char + Street"
    with open('data.json', 'w') as f:
        json.dump(Infotext, f)
        f.write("\n")

    with Pool(8) as pool:
        result_list = list(tqdm(pool.imap_unordered(, range(len(email_list)), chunksize=5), total=len(email_list))))


if __name__ == '__main__':
    try:
        main()

    except Exception as e:
        print(e)
        print(traceback.print_exc())
        print(traceback)

Upvotes: 0

Views: 67

Answers (2)

twolffpiggott
twolffpiggott

Reputation: 1103

The following approach delegates the multiprocessing to a pool of workers, each of which receives a chunk of indices and processes these indices a single line at a time (the choice of poolsize=8 and chunksize=5 here is arbitrary and can be tuned according to your requirements).

The result of all workers is then collected into a final list. Note that imap_unordered is only appropriate if you don't care about the order in which the lines are processed (i.e. result_list does not maintain the original order of content.

from multiprocessing import Pool
# progress bar to track your multiproc
from tqdm import tqdm

with open('email.txt') as f:
    content = f.readlines()

# this list will be accessed by each worker
global email_list
email_list = [x.strip('\n') for x in content]

# define function that worker will apply to each email
# it gets sent an index for the list of emails
# it accesses the email at that index, performs its function and returns
def runit(email_index):
    email = email_list[email_index]
    # do the stuff you're interested in for a single email

# run the multiprocessing to get your results
# this sends the indexes for the emails out to the workers
# and collects the results of runit into result list
with Pool(8) as pool:                                                
  result_list = list(tqdm(pool.imap_unordered(runit,    
                          range(len(email_list)), chunksize=5),                 
                          total=len(email_list)))

Upvotes: 1

Serge Ballesta
Serge Ballesta

Reputation: 149165

What you need is a pool of worker processes - even if for you use case, I really wonder whether threads (or multiprocessing.dummy) would not be enough.

A pool starts the asked number of worker processes, and you can submit asynchronous tasks to the pool that will be handled by the first free worked.

A stripped down version of your example (no fancy printing, no unnecessary reading of a sequential file in a list) could be:

import multiprocessing
import time

def runit(prod, email):

    print("Profile-" + str(prod) + ' - ' + email)
    #From here I can then use the email for each worker basically. Or thats the plan atleast. Theplan is that every worker will have its own email that can be used in here.
    # sys.exit() # NEVER CALL EXPLICITELY sys.exit() in a worker process
    time.sleep(1) # to add a delay inside each task

def main():
    while True:
        try:
            HowManyThread = int(input(
                'How many tasks do you wanna run? [NUMBERS] \n'))
        except ValueError:
            print("Stop being stupid")
            continue

        if HowManyThread == 0: break

        pool = multiprocessing.Pool(HowManyThread)
        with open('email.txt') as f:
            for i, email in enumerate(f):
                email = email.strip()
                # runit will be runned by a worker process
                pool.apply_async(runit, (i, email))

        pool.close()         # no more task to add
        pool.join()          # wait for last worker to end

if __name__ == "__main__":
    main()

Upvotes: 0

Related Questions