PolarBear10
PolarBear10

Reputation: 2305

Passing variables between stages in custom ML pipeline

I want to drop some columns from a dataframe, and then apply an ML algorithm. I have done so by building 2 separate pipelines. My question is how can I combine both the two pipelines into a single pipeline instead?

#######################
from typing import Iterable
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
#######################

#Custom Class
#######################
class ColumnDropper_test(Transformer):
    def __init__(self, banned_list: Iterable[str]):
        super().__init__()
        self.banned_list = banned_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.drop(
            *[x for x in df.columns if any(y in x for y in self.banned_list)])

        return df
#######################

#Sample Data
#######################
data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'banned_me': [14, 15, 16, 17],
    'target': [21, 31, 41, 51]
})

df = spark.createDataFrame(data)
#######################

# First Pipeline
#######################
column_dropper = ColumnDropper_test(banned_list=['banned_me'])

model = Pipeline(stages=[column_dropper]).fit(df).transform(df)
#######################

#Second Pipeline(Question: Add the block of code below to the above pipeline)
#########################

ready = [col for col in model.columns if col != 'target']
assembler = VectorAssembler(inputCols=ready, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='target')

model_2 = Pipeline(stages=[assembler,dtc])

train_data, test_data = model.randomSplit([0.5,0.5])
fit_model = model_2.fit(train_data)
results = fit_model.transform(test_data)   
results.select('features','Prediction').show()

The challenge I find is in the variable ready in the above code. As the model.columns will be different (less number of columns) after calling column_dropper, adding it to the same pipeline using (df.columns) will result in the following error, as banned_me has been removed by the orginal data.

#Combining both Pipelines failed attempt
model = Pipeline(stages=[column_dropper,assembler,dtc]).fit(df).transform(df)

An error occurred while calling o188.transform. : java.lang.IllegalArgumentException: Field "banned_me" does not exist. Available fields: ball_column, keep_column, hall_column, target

My initial suggestion is creating a new class that inherits from the ColumnDropper_testclass the new variable of df.columns. How can we make theassembler stage in the Pipeline look into the new df from the column_dropper stage instead of looking at the original df ?

Upvotes: 3

Views: 1531

Answers (1)

Scratch'N'Purr
Scratch'N'Purr

Reputation: 10419

You'll have to create a custom class that inherits the VectorAssembler to automatically set the inputCols:

from pyspark import keyword_only

class CustomVecssembler(VectorAssembler):
    @keyword_only
    def __init__(self, outputCol='features'):
        super(CustomVecssembler, self).__init__()
        self.transformer = VectorAssembler(outputCol=outputCol)
        if spark.version.startswith('2.1'):
            kwargs = self.__init__._input_kwargs
        else:
            kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, outputCol='features'):
        if spark.version.startswith('2.1'):
            kwargs = self.__init__._input_kwargs
        else:
            kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df):
        ready = [col for col in df.columns if col != 'target']
        self.setInputCols(ready)
        self.transformer.setInputCols(ready)
        df = self.transformer.transform(df)
        return df

Validating whether it works:

# prep dataset
data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'banned_me': [14, 15, 16, 17],
    'target': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)

# ORIGINAL IMPLEMENTATION
column_dropper = ColumnDropper_test(banned_list=['banned_me'])
model = Pipeline(stages=[column_dropper]).fit(df).transform(df)

ready = [col for col in model.columns if col != 'target']
assembler = VectorAssembler(inputCols=ready, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='target')

model_2 = Pipeline(stages=[assembler, dtc])

train_data, test_data = model.randomSplit([0.5, 0.5])
fit_model = model_2.fit(train_data)
results = fit_model.transform(test_data)
results.select('features','Prediction').show()

# +--------------+----------+
# |      features|Prediction|
# +--------------+----------+
# |[1.0,15.0,8.0]|      51.0|
# |[2.0,16.0,9.0]|      51.0|
# +--------------+----------+

# USING CUSTOM VEC ASSEMBLER
new_assembler = CustomVecssembler(outputCol='features')
new_pipeline = Pipeline(stages=[column_dropper, new_assembler, dtc]).fit(train_data)
new_results = new_pipeline.transform(test_data)
new_results.select('features', 'Prediction').show()

# +--------------+----------+
# |      features|Prediction|
# +--------------+----------+
# |[1.0,15.0,8.0]|      51.0|
# |[2.0,16.0,9.0]|      51.0|
# +--------------+----------+

Upvotes: 3

Related Questions