Khawar Islam
Khawar Islam

Reputation: 2914

Multiprocessing in python loop

I am generating negative pairs with the help of positive pairs. I would like to speed up the process by using all core of the CPU. On a single CPU core, it takes almost five days including day and night.

I tend to change the below code in multiprocessing. Meanwhile, I have no list of "positives_negatives.csv"

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    for combo in tqdm(itertools.combinations(identities.values(), 2), desc="Negatives"):
        for cross_sample in itertools.product(combo[0], combo[1]):
            negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
                                         ignore_index=True)
    negatives["decision"] = "No"
    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)

Modified code

def multi_func(iden, negatives):
    for combo in tqdm(itertools.combinations(iden.values(), 2), desc="Negatives"):
        for cross_sample in itertools.product(combo[0], combo[1]):
            negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
                                         ignore_index=True)

Used

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        secs = [5, 4, 3, 2, 1]
        results = executor.map(multi_func(identities, negatives), secs)

    negatives["decision"] = "No"
    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)

Upvotes: 1

Views: 189

Answers (1)

Khawar Islam
Khawar Islam

Reputation: 2914

The best way is to implement Process Pool Executor class and create a separate function. Like you can achieve in this way

Libraries

from concurrent.futures.process import ProcessPoolExecutor
import more_itertools
from os import cpu_count

def compute_cross_samples(x):
    return pd.DataFrame(itertools.product(*x), columns=["file_x", "file_y"])

Modified code

if Path("positives_negatives.csv").exists():
    df = pd.read_csv("positives_negatives.csv")
else:
    with ProcessPoolExecutor() as pool:
        # take cpu_count combinations from identities.values
        for combos in tqdm(more_itertools.ichunked(itertools.combinations(identities.values(), 2), cpu_count())):
            # for each combination iterator that comes out, calculate the cross
            for cross_samples in pool.map(compute_cross_samples, combos):
                # for each product iterator "cross_samples", iterate over its values and append them to negatives
                negatives = negatives.append(cross_samples)

    negatives["decision"] = "No"

    negatives = negatives.sample(positives.shape[0])
    df = pd.concat([positives, negatives]).reset_index(drop=True)
    df.to_csv("positives_negatives.csv", index=False)

Upvotes: 1

Related Questions