Nipun Talukdar
Nipun Talukdar

Reputation: 5387

How do we retrain XGBoost model using Spark ML pipeline?

Our requirement is to retrain a XGBoost model created by Spark ML pipeline. We get huge data every week or month and want to use the previously trained model to retrain on the delta of the data. Is that possible and if possible it will be great help if we may get some sample code. Our Spark jobs written in Scala.

I can do this with Pyspark, sample code is given below:

from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

existing_model_path = "/home/geet/test/xgboostexp/testmodel/stages/2_SparkXGBClassifier_5adc1ebbc456"
xgb_model = SparkXGBClassifierModel.load(existing_model_path)

new_data_path = "/home/geet/test/xgboostexp/output_file.csv"
data = spark.read.option("header", True).csv(new_data_path)


for col_name in ["V1", "V2", "V3"]:
    data = data.withColumn(col_name, col(col_name).cast('float'))

label_indexer = StringIndexer(inputCol="class", outputCol="indexedLabel").fit(data)
assembler = VectorAssembler(inputCols=["V1", "V2", "V3"], outputCol="features")
xgb_classifier = SparkXGBClassifier(
    label_col="indexedLabel",
    features_col="features",
    num_workers=1,
    xgb_model=xgb_model.get_booster()  # Use the previous model as the base
)

pipeline = Pipeline(stages=[label_indexer, assembler, xgb_classifier])
pipeline_model = pipeline.fit(data)
updated_model_path = "/home/geet/test/xgboostexp/testmodel2"
pipeline_model.write().overwrite()

.save(updated_model_path)

Upvotes: 0

Views: 34

Answers (0)

Related Questions