CenturyGhost
CenturyGhost

Reputation: 11

why do I have a label Problem when using Crossvalidator

I try to use CrossValidator. My model is as follows :

training

#training data - several repartition have been tested, 50/50 seems the best
(trainData, testData) = modelData.randomSplit([0.5, 0.5])

#counting data used
print("Training dataset count : " +str(trainData.count()))
print("Test dataset count : " +str(testData.count()))
trainData.cache()
testData.cache()

Model

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'v4_Indexer', maxIter = 5)
lrModel = lr.fit(trainData)
predictions = lrModel.transform(testData)
predictions.select('v4_Indexer','features','rawPrediction', 'prediction', 'probability').toPandas().head(2500)

I try this code for crossvalidation :

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[lr])
paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0,0.5,1]).addGrid(lr.elasticNetParam, [0,0.5,1]).addGrid(lr.maxIter,[1,10]).build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainData)
trainingSummary = cvModel.bestModel

I have a warning:

/databricks/spark/python/pyspark/ml/util.py:92: UserWarning: CrossValidator_7ba8c8c903af fit call failed but some spark jobs may still running for unfinished trials. To address this issue, you should enable pyspark pinned thread mode.
  warnings.warn("{} fit call failed but some spark jobs 

And an error:

IllegalArgumentException: label does not exist. Available: v4_Indexer, features, CrossValidator_7ba8c8c903af_rand

this model worked for a while. I do not understand why it doesn't now.

Upvotes: 0

Views: 421

Answers (1)

CenturyGhost
CenturyGhost

Reputation: 11

I've solved the issue by changing entirely my code. This is what it looks like now (prerequisite %pip install mlflow):

from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# StringIndexer: Convert the input column "label" (digits) to categorical values
indexer = StringIndexer(inputCol="v4_Indexer", outputCol="indexedLabel")

# Create an evaluator.  In this case, use "weightedPrecision".
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="v4_Indexer", metricName="weightedPrecision")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# DecisionTreeClassifier: Learn to predict column "indexedLabel" using the "features" column

dtc = DecisionTreeClassifier(labelCol="indexedLabel")

# Chain indexer + dtc together into a single ML Pipeline
pipeline = Pipeline(stages=[indexer, dtc])
# Define the parameter grid to examine.
grid = ParamGridBuilder().addGrid(dtc.maxDepth, [2, 3, 4, 5, 6, 7, 8]).addGrid(dtc.maxBins, [2, 4, 8]).build()

# Create a cross validator, using the pipeline, evaluator, and parameter grid you created in previous steps.
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)

# Explicitly create a new run.
# This allows this cell to be run multiple times.
# If you omit mlflow.start_run(), then this cell could run once, but a second run would hit conflicts when attempting to overwrite the first run.
import mlflow
import mlflow.spark

with mlflow.start_run():
    # Run the cross validation on the training dataset. The cv.fit() call returns the best model it found.
    cvModel = cv.fit(train)
    
    # Evaluate the best model's performance on the test dataset and log the result.
    test_metric = evaluator.evaluate(cvModel.transform(test))
    mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric) 
  
    # Log the best model.
    mlflow.spark.log_model(spark_model=cvModel.bestModel, artifact_path='best-model')

Upvotes: 0

Related Questions