sparkly
sparkly

Reputation: 11

how can i adjust the threshold in GBTClassifier in pyspark

it seems that there is no way to set the threshold on the GBTClassifier model in pyspark. it does appear in the explainparams() , but is does not in the constructor or in set function in pyspark . if it does, please advise how it can be done, if not- how can i adjust my model to predict better my classes in the binary classification problem ?

Upvotes: 1

Views: 1075

Answers (2)

Chappy Hickens
Chappy Hickens

Reputation: 445

For people coming back to this question with more recent versions of pyspark (e.g., 3.3.2), after fitting you can take your GBTClassificationModel object and run setThresholds([a,b]). In my case 'a' goes with label of 0 and 'b' goes with 1. To set the threshold for the typical probability that the label takes on the value of 1, and 'p' is your chosen threshold, set a and b as follows:

a=1; b=p/(1-p)

Here's a working example:

import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline, PipelineModel

n=1000; k=10
pdf = pd.DataFrame({**{f'x{i}':np.random.randn(n) for i in range(k)},**{'label':np.random.randint(0,2,n)}}) #totally random data
s_pdf = spark.createDataFrame(pdf).sort('label') #ensure 0 is the first value so 0 is the first label
model=Pipeline(stages=[VectorAssembler(inputCols=[f'x{i}' for i in range(k)],outputCol='features'),GBTClassifier()]).fit(s_pdf)
second_element = F.udf(lambda v: float(v[1]), 'float') #to manually extract probabilities from array

result = model.transform(s_pdf)
summary=None
for p in [0.3, 0.5, 0.7]:
  grouped_result = result.withColumn('probability', second_element('probability')).groupBy(F.expr(f'case when probability>={p} then 1 else 0 end as prediction')).count().withColumn('probability_threshold',F.lit(p)).withColumn('threshold_approach', F.lit('manual'))
  if summary is None:
    summary=  grouped_result
  else:
    summary = summary.unionByName(grouped_result)
  # for p in thresholds:
  clf = model.stages[1].copy() #copy necessary or all clf's will have the last threshold in the loop due to lazy eval
  clf.setThresholds([1,p/(1-p)])
  result2 = PipelineModel(stages=[s for s in model.stages[:-1]]+ [clf]).transform(s_pdf)
  grouped_result2 = result2.withColumn('probability', second_element('probability')).groupBy('prediction').count().withColumn('probability_threshold',F.lit(p)).withColumn('threshold_approach', F.lit('setThresholds'))
  summary = summary.unionByName(grouped_result2)
print(summary.sort('probability_threshold', 'prediction', 'threshold_approach').toPandas())

And the output will look something like this:

output dataframe

Upvotes: 0

Related Questions