Moj
Moj

Reputation: 6361

mlfow.spark.log_model fails with input_sample

I'm trying to log my ALS model with mlflow.spark.log_model. my data looks like this (movielens dataset)

data = [
    Row(user_id=1, item_id=101, rating=4.0),
    Row(user_id=1, item_id=102, rating=3.5),
    Row(user_id=2, item_id=101, rating=5.0),
    Row(user_id=2, item_id=103, rating=2.0),
    Row(user_id=3, item_id=104, rating=4.5),
    Row(user_id=3, item_id=105, rating=3.0)
]

# Create a DataFrame from the list of Rows
 = spark.createDataFrame(data)

this is how I train and log my model:

with mlflow.start_run(run_name="ALS_best_model") as run:
        fe_full_data, df_full_data, df_train, df_test = split_data()
        als = ALS()
        # Now we set the parameters for the method
        als.setMaxIter(MAX_ITER)\
        .setSeed(SEED)\
        .setRegParam(best_params["REG_PARAM"])\
        .setUserCol(COL_USER)\
        .setItemCol(COL_ITEM)\
        .setRatingCol(COL_LABEL)\
        .setRank(best_params["RANK"])

        mlflow.log_param("MAX_ITER", MAX_ITER)
        mlflow.log_param("RANK", best_params["RANK"])
        mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])

        # Create the model with these parameters.
        model = als.fit(df_train)
        #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN
        model.setColdStartStrategy('drop')
        predictions = model.transform(df_train)
        signature = infer_signature(model_input = df_train, model_output = predictions.select(COL_LABEL))

        #log the model
        dataset = mlflow.data.Dataset(df_train)
        #mlflow.log_input(dataset, "training")
        mlflow.spark.log_model(model, model_name,
                               sample_example = df_train.limit(20),
                               signature = signature,
                               conda_env=mlflow.spark.get_default_conda_env(),
                               registered_model_name=f"{catalog_name}.{model_schema}.{model_name}")
        mlflow.artifacts(df_train.limit(3).toJSON(),"sample_input")

        #mlflow.spark.log_model(model, "model")    
        evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
        rmse = evaluator.setMetricName("rmse").evaluate(predictions)
        mlflow.log_metric('rmse', rmse)

I also tried df_train.limit(20).toPandas(). they all throw this error:

ModuleNotFoundError: No module named 'mleap'
File <command-2685032901804064>, line 27
     25 dataset = mlflow.data.Dataset(df_train)
     26 #mlflow.log_input(dataset, "training")
---> 27 mlflow.spark.log_model(model, model_name,
     28                        sample_input=sample_input,
     29                        signature = signature,
     30                        conda_env=mlflow.spark.get_default_conda_env(),
     31                        registered_model_name=f"{catalog_name}.{model_schema}.{model_name}")
     33 #mlflow.spark.log_model(model, "model")    
     34 evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
File /databricks/python/lib/python3.11/site-packages/mlflow/mleap/__init__.py:220, in add_to_model(mlflow_model, path, spark_model, sample_input)
    206 @deprecated(alternative="mlflow.onnx", since="2.6.0")
    207 @keyword_only
    208 def add_to_model(mlflow_model, path, spark_model, sample_input):
    209     """
    210     Add the MLeap flavor to an existing MLflow model.
    211 
   (...)
    218             required by MLeap for data schema inference.
    219     """
--> 220     import mleap.version
    222     # This import statement adds `serializeToBundle` and `deserializeFromBundle` to `Transformer`:
    223     # https://github.com/combust/mleap/blob/37f6f61634798118e2c2eb820ceeccf9d234b810/python/mleap/pyspark/spark_support.py#L32-L33" target="_blank" rel="noopener noreferrer">https://github.com/combust/mleap/blob/37f6f61634798118e2c2eb820ceeccf9d234b810/python/mleap/pyspark/spark_support.py#L32-L33</a></span>
    224     from mleap.pyspark.spark_support import SimpleSparkSerializer  # noqa: F401

when I remove the sample_example parameters all work fine. I managed to solve the issue with converting the training data to a dictionary input_example=df_train.limit(20).toPandas().to_dict(orient="records") but not sure if this is the best way to do this

I use databricks cluster with the runtime 15.3, spark 3.5. scala 2.12

Upvotes: 0

Views: 23

Answers (0)

Related Questions