databricks/spark/python/pyspark/serializers.py AttributeError: 'str' object has no attribute 'get'

When executing the following code provide by databricks, an serialization error appears. The code is basically an hyperopt optimization of the XGboost in the databricks environment. This code is part of an end-to-end tutorial provided by databricks.

Code:

from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb
 
pyspark.InheritableThread  
#mlflow.set_experiment("/Shared/experiments/ichi")
search_space = {
  'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
  'learning_rate': hp.loguniform('learning_rate', -3, 0),
  'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
  'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
  'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
  'objective': 'binary:logistic',
  'seed': 123, # Set a seed for deterministic training
}
 
def train_model(params):
  # With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow.
  mlflow.xgboost.autolog()
  with mlflow.start_run(nested=True):
    train = xgb.DMatrix(data=X_train, label=y_train)
    validation = xgb.DMatrix(data=X_val, label=y_val)
    # Pass in the validation set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric
    # is no longer improving.
    booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
                        evals=[(validation, "validation")], early_stopping_rounds=50)
    validation_predictions = booster.predict(validation)
    auc_score = roc_auc_score(y_val, validation_predictions)
    mlflow.log_metric('auc', auc_score)
 
    signature = infer_signature(X_train, booster.predict(train))
    mlflow.xgboost.log_model(booster, "model", signature=signature)
    
    # Set the loss to -1*auc_score so fmin maximizes the auc_score
    return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}
 
# Greater parallelism will lead to speedups, but a less optimal hyperparameter sweep. 
# A reasonable value for parallelism is the square root of max_evals.
spark_trials = SparkTrials(parallelism=10)
 
# Run fmin within an MLflow run context so that each hyperparameter configuration is logged as a child run of a parent
# run called "xgboost_models" .
with mlflow.start_run(run_name='xgboost_models'):
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=96,
    trials=spark_trials,
  )

The error is:

/databricks/spark/python/pyspark/rdd.py:980: FutureWarning: Deprecated in 3.1, Use pyspark.InheritableThread with the pinned thread mode enabled.
  warnings.warn(

  0%|          | 0/96 [00:00<?, ?trial/s, best loss=?]trial task 0 failed, exception is Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/python/lib/python3.8/site-packages/mlflow/exceptions.py", line 83, in __init__
    error_code = json.get("error_code", ErrorCode.Name(INTERNAL_ERROR))
AttributeError: 'str' object has no attribute 'get'
.
 None

This code is excuted in the databricks notebook. I tried different versions for mlflow, pyspark and hyperopt but without sucess.

Upvotes: 3

Views: 1238

Answers (1)

Martin Fridrich
Martin Fridrich

Reputation: 344

The SparkTrials automatically track its runs, and it clashes with the tracking you explicitly set in the train_model function. Just remove all of the mlflow calls from the train_model and you are good to go.

Upvotes: 4

Related Questions