Reputation: 6361
I'm trying to log my ALS model with mlflow.spark.log_model
. my data looks like this (movielens dataset)
data = [
Row(user_id=1, item_id=101, rating=4.0),
Row(user_id=1, item_id=102, rating=3.5),
Row(user_id=2, item_id=101, rating=5.0),
Row(user_id=2, item_id=103, rating=2.0),
Row(user_id=3, item_id=104, rating=4.5),
Row(user_id=3, item_id=105, rating=3.0)
]
# Create a DataFrame from the list of Rows
= spark.createDataFrame(data)
this is how I train and log my model:
with mlflow.start_run(run_name="ALS_best_model") as run:
fe_full_data, df_full_data, df_train, df_test = split_data()
als = ALS()
# Now we set the parameters for the method
als.setMaxIter(MAX_ITER)\
.setSeed(SEED)\
.setRegParam(best_params["REG_PARAM"])\
.setUserCol(COL_USER)\
.setItemCol(COL_ITEM)\
.setRatingCol(COL_LABEL)\
.setRank(best_params["RANK"])
mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("RANK", best_params["RANK"])
mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])
# Create the model with these parameters.
model = als.fit(df_train)
#drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN
model.setColdStartStrategy('drop')
predictions = model.transform(df_train)
signature = infer_signature(model_input = df_train, model_output = predictions.select(COL_LABEL))
#log the model
dataset = mlflow.data.Dataset(df_train)
#mlflow.log_input(dataset, "training")
mlflow.spark.log_model(model, model_name,
sample_example = df_train.limit(20),
signature = signature,
conda_env=mlflow.spark.get_default_conda_env(),
registered_model_name=f"{catalog_name}.{model_schema}.{model_name}")
mlflow.artifacts(df_train.limit(3).toJSON(),"sample_input")
#mlflow.spark.log_model(model, "model")
evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mlflow.log_metric('rmse', rmse)
I also tried df_train.limit(20).toPandas()
. they all throw this error:
ModuleNotFoundError: No module named 'mleap'
File <command-2685032901804064>, line 27
25 dataset = mlflow.data.Dataset(df_train)
26 #mlflow.log_input(dataset, "training")
---> 27 mlflow.spark.log_model(model, model_name,
28 sample_input=sample_input,
29 signature = signature,
30 conda_env=mlflow.spark.get_default_conda_env(),
31 registered_model_name=f"{catalog_name}.{model_schema}.{model_name}")
33 #mlflow.spark.log_model(model, "model")
34 evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
File /databricks/python/lib/python3.11/site-packages/mlflow/mleap/__init__.py:220, in add_to_model(mlflow_model, path, spark_model, sample_input)
206 @deprecated(alternative="mlflow.onnx", since="2.6.0")
207 @keyword_only
208 def add_to_model(mlflow_model, path, spark_model, sample_input):
209 """
210 Add the MLeap flavor to an existing MLflow model.
211
(...)
218 required by MLeap for data schema inference.
219 """
--> 220 import mleap.version
222 # This import statement adds `serializeToBundle` and `deserializeFromBundle` to `Transformer`:
223 # https://github.com/combust/mleap/blob/37f6f61634798118e2c2eb820ceeccf9d234b810/python/mleap/pyspark/spark_support.py#L32-L33" target="_blank" rel="noopener noreferrer">https://github.com/combust/mleap/blob/37f6f61634798118e2c2eb820ceeccf9d234b810/python/mleap/pyspark/spark_support.py#L32-L33</a></span>
224 from mleap.pyspark.spark_support import SimpleSparkSerializer # noqa: F401
when I remove the sample_example
parameters all work fine. I managed to solve the issue with converting the training data to a dictionary input_example=df_train.limit(20).toPandas().to_dict(orient="records")
but not sure if this is the best way to do this
I use databricks cluster with the runtime 15.3, spark 3.5. scala 2.12
Upvotes: 0
Views: 23