Reputation: 2914
I am generating negative pairs with the help of positive pairs. I would like to speed up the process by using all core of the CPU. On a single CPU core, it takes almost five days including day and night.
I tend to change the below code in multiprocessing. Meanwhile, I have no list of "positives_negatives.csv"
if Path("positives_negatives.csv").exists():
df = pd.read_csv("positives_negatives.csv")
else:
for combo in tqdm(itertools.combinations(identities.values(), 2), desc="Negatives"):
for cross_sample in itertools.product(combo[0], combo[1]):
negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
ignore_index=True)
negatives["decision"] = "No"
negatives = negatives.sample(positives.shape[0])
df = pd.concat([positives, negatives]).reset_index(drop=True)
df.to_csv("positives_negatives.csv", index=False)
Modified code
def multi_func(iden, negatives):
for combo in tqdm(itertools.combinations(iden.values(), 2), desc="Negatives"):
for cross_sample in itertools.product(combo[0], combo[1]):
negatives = negatives.append(pd.Series({"file_x": cross_sample[0], "file_y": cross_sample[1]}).T,
ignore_index=True)
Used
if Path("positives_negatives.csv").exists():
df = pd.read_csv("positives_negatives.csv")
else:
with concurrent.futures.ProcessPoolExecutor() as executor:
secs = [5, 4, 3, 2, 1]
results = executor.map(multi_func(identities, negatives), secs)
negatives["decision"] = "No"
negatives = negatives.sample(positives.shape[0])
df = pd.concat([positives, negatives]).reset_index(drop=True)
df.to_csv("positives_negatives.csv", index=False)
Upvotes: 1
Views: 189
Reputation: 2914
The best way is to implement Process Pool Executor class and create a separate function. Like you can achieve in this way
Libraries
from concurrent.futures.process import ProcessPoolExecutor
import more_itertools
from os import cpu_count
def compute_cross_samples(x):
return pd.DataFrame(itertools.product(*x), columns=["file_x", "file_y"])
Modified code
if Path("positives_negatives.csv").exists():
df = pd.read_csv("positives_negatives.csv")
else:
with ProcessPoolExecutor() as pool:
# take cpu_count combinations from identities.values
for combos in tqdm(more_itertools.ichunked(itertools.combinations(identities.values(), 2), cpu_count())):
# for each combination iterator that comes out, calculate the cross
for cross_samples in pool.map(compute_cross_samples, combos):
# for each product iterator "cross_samples", iterate over its values and append them to negatives
negatives = negatives.append(cross_samples)
negatives["decision"] = "No"
negatives = negatives.sample(positives.shape[0])
df = pd.concat([positives, negatives]).reset_index(drop=True)
df.to_csv("positives_negatives.csv", index=False)
Upvotes: 1