Moj
Moj

Reputation: 6363

FeatureEngineeringClient failing to run inference with mlflow.spark flavor

I am using Databricks FeatureEngineeringClient to log my spark.ml model for batch inference. I use the ALS model on the movielens dataset. My dataset has three columns: user_id, item_id and rank.

here is my code to prepare the dataset:

    fe_data = fe.create_training_set(df=df_ratings, feature_lookups=model_feature_lookups, label=label, exclude_columns=["rating_date_month","rating_date_dayofmonth","timestamp"])
    df_data = fe_data.load_df()
    df_data = df_data.na.drop()
    (df_train, df_test) = df_data.randomSplit([0.75,0.25],SEED)

note: both df_ratings and my feature table have the item_id and user_id.

then I train and log my model as follow:


    from pyspark.ml.recommendation import ALS

    import mlflow
    from mlflow.models.signature import infer_signature
    from mlflow.tracking.client import MlflowClient
    from databricks.feature_engineering import FeatureEngineeringClient

    mlflow.set_registry_uri("databricks-uc")
    fe = FeatureEngineeringClient()
    
    best_params ={}
    best_params["REG_PARAM"] = 0.01
    best_params["RANK"] = 2
    
    with mlflow.start_run(run_name="ALS_final_model") as run:
         fe_full_data, df_full_data, df_train, df_test = split_data()
         als = ALS()
         als.setMaxIter(MAX_ITER)\
        .setSeed(SEED)\
        .setRegParam(best_params["REG_PARAM"])\
        .setUserCol(COL_USER)\
        .setItemCol(COL_ITEM)\
        .setRatingCol(COL_LABEL)\
        .setRank(best_params["RANK"])

         mlflow.log_param("MAX_ITER", MAX_ITER)
         mlflow.log_param("RANK", best_params["RANK"])
         mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])

         model = als.fit(df_train)
         model.setColdStartStrategy('drop') 
         predictions = model.transform(df_train)

         model_info = fe.log_model(model=model, 
                                  artifact_path = model_name,
                                  flavor=mlflow.spark,
                                  training_set=fe_full_data,
                                  conda_env=mlflow.spark.get_default_conda_env(),
                                  registered_model_name= f"{catalog_name}.{model_schema}.{model_name}_fs"
                                )

         evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
         rmse = evaluator.setMetricName("rmse").evaluate(predictions)
         mlflow.log_metric('rmse', rmse)
 

after that I register my model to the UC. For the batch inference I use the following code:

   model_uri = f"{catalog_name}.{model_schema}.{model_name}_fs"
   model_version_uri = f"models:/{model_uri}@champion"
   predictions_df = fe.score_batch(model_uri=f"{model_version_uri}",df = df_train)

here I get the following warning:

2024/08/14 12:09:18 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.

2024/08/14 12:09:18 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'

first question: it seems the mlflow is loading the file as pyfunc flawor although I registered as spark. why is that?

when I try to run display(prediction_df) I get the following error (truncated log):

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/databricks/spark/python/pyspark/worker.py", line 1631, in read_udfs
    arg_offsets, udf = read_single_udf(
  File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
    command = serializer._read_with_length(file)
pyspark.serializers.SerializationError: Caused by OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

File "<command-xxxxxx>", line 1, in <module>
    predictions_df

it seems I am facing an issue when running a batch scoring job using PySpark on Databricks. The job throws an OSError related to an I/O failure while deserializing objects. The error occurs when trying to load objects with cloudpickle and leads to a SerializationError.

Upvotes: 0

Views: 59

Answers (0)

Related Questions