Trodenn
Trodenn

Reputation: 17

alternatives to tolist() for pyspark pandas (pandas api)

We have a code that takes care of customer orders from a retail website, the volume of data is pretty large and is getting bigger as days goes by. The tricky part is that this data is in a different language so we use sentence transformers to process it, but it appears that this adds a whole different level of performance hindrance. The code now wont even finish running before having the spark cluster terminated for either executor loss or stage materialization failure. We use a combination of spark and pyspark.pandas. The goal is to leverage the spark computational power for processing big data, but it just seems like we have not really executed this well enough. From what I've seen right now, a direct thing that consumes a lot of memory in the code is the usage of tolist(). I was wondering if anyone could tell me if there is any alternative to achieve the same thing but without loading everything into the memory like tolist().

The code is as below:

def get_product_keyword(
    df: ps.DataFrame,
    bu: constants.BU,
) -> ps.DataFrame:
    keywords = tools.get_keywords(bu, en=False)
        # from SentenceTransformer for thai_transformer.extractRelevantThaiwords
        embedder = SentenceTransformer("mrp/simcse-model-m-bert-thai-cased")
        # key_phrase_extract stays the same and use transformer_th to get the
        # keywords
        keyword_dict = thai_transformer.extractRelevantThaiwords(
            df, th_model=embedder, parse="dataframe"
        )
        new_dict = {}
        for keyword, product_data in keyword_dict.items():
            product_list = list(product_data.values())[0][-1]
            for product_code in product_list:
                if product_code not in new_dict:
                    new_dict[product_code] = [keyword]
                else:
                    new_dict[product_code].append(keyword)

        revised_df = get_revised_keywords().toPandas()
        df = df.to_pandas()
        revised_dict = {}

        for index, row in revised_df.iterrows():
            key = row["key"]
            value = row["value"]

            revised_dict[key] = value

        result_dict = {}

        for product_code, keywords_list in new_dict.items():
            revised_keywords_list = []

            for keyword in keywords_list:
                if keyword in revised_dict:
                    revised_keyword = revised_dict[keyword]
                    revised_keywords_list.append(revised_keyword)
                else:
                    revised_keywords_list.append(keyword)
            result_dict[product_code] = revised_keywords_list

            def assign_keywords(row):
                product_code = row["product_code"]
                if product_code not in result_dict:
                    return []
                keywords = result_dict[product_code]
                return list(set(keywords))

            df["keywords_extract"] = df.apply(assign_keywords, axis=1)
        df = ps.from_pandas(df)

        return df
    else:
        df["key_phrase_extract"] = df["key_phrase_extract"].apply(lambda x: x.split())

    try:
        targets = np.concatenate(df["key_phrase_extract"].tolist()).tolist()
    except Exception:
        targets = []

    tf = Transformer(lang=constants.Language.zh_hk, keywords=keywords)
    target_map = tf.get_transform_dict(bu, targets, execute_cache=False)
    df["keywords_extract"] = df["key_phrase_extract"].apply(
        lambda x: list(
            set(
                target_map[word]
                for word in x
                if (word in target_map) and (len(target_map[word]) > 1)
            )
        )
    )
       # use lamda function to remove word in block_list_cn from keywords_extract
       df["keywords_extract"] = df["keywords_extract"].apply(
           lambda x: [word for word in x if word not in block_list_cn]
        )
    return df


the code here is to give a rough idea as to how the process works, I personally think if we can achieve the same result without using tolist already the performance should improve by a lot. Any advice is welcomed! thanks

Upvotes: 0

Views: 54

Answers (1)

user238607
user238607

Reputation: 2468

It seems from the code that there is no interaction between the different rows of the dataframe, if that is true then you can use ideas from this answer which I wrote a few days ago.

Take a look at the mapPartition_inference function which runs at the partition level so that you don't have collect everything at one executor. It can run over all the partition at the same time. Then you can collect the results and do further aggregation on it.

https://stackoverflow.com/a/77033826/3238085

Relevant snippet is below :

def mapPartition_inference(partitioned_rows):

    features_array_list = []
    for row in partitioned_rows:
        features_array_list.append(row.features_concat)

    X = pd.DataFrame(features_array_list)
    X.columns = all_columns

    prediction_classification_scores_list = []
    # inference over all 10 models in a for loop
    for model_ii in broadcasted_model_clone_list.value:
        curr_result = model_ii.predict(X)
        prediction_classification_scores_list.append(curr_result[:, 0])

    prob_scores = np.array(prediction_classification_scores_list)
    final_result = pd.Series(prob_scores.mean(axis=0)).tolist()
    print("Hooray are we here!!!!!!!!!!!!!")

    constructed_result = []
    for jj in range(len(features_array_list)):
        constructed_result.append([features_array_list[jj], final_result[jj]])

    return iter(constructed_result)


partitioned_df = X_train_df.limit(30).repartition(10)

partition_predicted_df = partitioned_df.rdd.mapPartitions(mapPartition_inference).toDF(["features_concat", "prediction_avg_scores"])

Let me know if you don't understand anything or are facing a problem in adapting the solution. Always happy to help.

Upvotes: 0

Related Questions