Wingnut
Wingnut

Reputation: 45

Apply scikit-learn model to pyspark dataframe column

I have a trained Scikit-learn LogisticRegression model in a sklearn.pipeline.Pipeline. This is an NLP task. The model is saved as a pkl file (actually in ML Studio models, but I download it to databricks dbfs).

I have a Hive table (delta-backed) containing some 1 million rows. The rows have, amongst other things, an id, a keyword_context column (containing the text), a modelled column (boolean, indicates the model has been run on this row), and a prediction column, which is an integer for the class output by the logistic regression.

My problem is how to update the prediction column.

running locally I can do

def generatePredictions(data:pd.DataFrame, model:Pipeline) -> pd.DataFrame:
  data.loc[:, 'keyword_context'] = data.keyword_context.apply(lambda x: x.replace("\n", " ")
  data['prediction'] = model.predict(data.keyword_context)
  data['modelled'] = True
  return data

This actually runs fast enough (~20s), but running the UPDATEs back to databricks via the databricks.sql.connector, takes many hours. So I want to do the same in a pyspark notebook to bypass the lengthy upload.

The trouble is that it is generally suggested to use inbuilt functions (which this isn't) or if there must be a udf then the examples all use inbuilt types, not Pipelines. I'm wondering whether the model should be loaded within the function, and I presume the function takes a single row, which means a lot of loading. I'm really not sure how to code the function, or call it.

Upvotes: 0

Views: 1951

Answers (1)

Kevin Kho
Kevin Kho

Reputation: 687

I work on the Fugue project which aims to provide a simpler interface than the Spark one for porting Python/Pandas code. This is actually the first use case in our tutorial. Fugue will use the underlying Spark call (pandas_udf, udf, mapPartitions, applyInPandas, mapInPandas) based on the arguments you provide with minimal overhead.

Here is what the code looks like.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

X = pd.DataFrame({"x_1": [1, 1, 2, 2], "x_2":[1, 2, 2, 3]})
y = np.dot(X, np.array([1, 2])) + 3
reg = LinearRegression().fit(X, y)

def predict(df: pd.DataFrame, model: LinearRegression) -> pd.DataFrame:
    return df.assign(predicted=model.predict(df))

input_df = pd.DataFrame({"x_1": [3, 4, 6, 6], "x_2":[3, 3, 6, 6]})

sdf = spark.createDataFrame(input_df)

# This is the start of the Fugue portion. It's minimal
from fugue import transform

result = transform(
    sdf,
    predict,
    schema="*,predicted:double",
    params=dict(model=reg),
    engine=spark
)
print(type(result))
result.show()

This code will be applied per partition. Schema is a requirement for Spark. I am not sure but it sounds like you were using a row-wise UDF, so I think this will be faster. It also leaves your logic easy to unit test because you don't need Spark to unit test.

On loading the file inside the function

If you load the file inside the function, it gets executed on the workers. If you pass it in, it gets passed through the scheduler. This can create a lot of redundant passing of data. Loading it inside might speed things up.

Upvotes: 1

Related Questions