Pierre Gourseaud
Pierre Gourseaud

Reputation: 2477

Is cross-validation faster without using pipelines in spark-ml?

Suppose I have many steps in my feature engineering: I would have many transformers in my pipeline. I am wondering how is Spark handling these transformers during the cross-validation of the pipeline: are they executed for each fold? Would it be faster to apply the transformers before cross-validating the model?

Which of these workflow would be the fastest (or is there a better solution)?:

1. Cross validator on pipeline

transformer1 = ...
transformer2 = ...
transformer3 = ...
lr = LogisticRegression(...)
pipeline = Pipeline(stages=[transformer1, transformer2, transformer3, lr])
crossval = CrossValidator(estimator=pipeline, numFolds=10, ...)

cvModel = crossval.fit(training)
prediction = cvModel.transform(test)

2. Cross validator after pipeline

transformer1 = ...
transformer2 = ...
transformer3 = ...
pipeline = Pipeline(stages=[transformer1, transformer2, transformer3])
training_trans = pipeline.fit(training).transform(training)

lr = LogisticRegression(...)
crossval = CrossValidator(estimator=lr, numFolds=10, ...)

cvModel = crossval.fit(training_trans)
prediction = cvModel.transform(test)

Finally, I have the same question with using caching: In 2. I could cache training_trans before doing my cross validation. In 1. I could use a Cacher transformer in the pipeline before the LogisticRegression. (see Caching intermediate results in Spark ML pipeline for the Cacher)

Upvotes: 2

Views: 836

Answers (2)

CuriousKK
CuriousKK

Reputation: 35

As per a recent spark.ml training I attended - it was advised to follow this approach:

cv = CrossValidator(estimator=lr,..)
pipelineModel = Pipeline(stages=[idx,assembler,cv])
cv_model= pipelineModel.fit(train)

Upvotes: 1

Pierre Gourseaud
Pierre Gourseaud

Reputation: 2477

I have made the experiment but I'm still interested if anyone can give a more detailed answer.

%%time
pipeline1 = Pipeline(stages=stringIndexers+oneHotEncoders+[vectorAssembler])
train2 = pipeline1.fit(train).transform(train)
crossval = CrossValidator(estimator=logisticRegression, ...)
crossval.fit(train2)

CPU times: user 508 ms, sys: 136 ms, total: 644 ms / Wall time: 2min 2s

%%time
pipeline1 = Pipeline(stages=stringIndexers+oneHotEncoders+[vectorAssembler])
train2 = pipeline1.fit(train).transform(train)
train2.cache().count()
crossval = CrossValidator(estimator=logisticRegression, ...)
crossval.fit(train2)

CPU times: user 560 ms, sys: 104 ms, total: 664 ms / Wall time: 1min 25s

%%time
pipeline2 = Pipeline(stages=stringIndexers+oneHotEncoders+[vectorAssembler, logisticRegression])
crossval = CrossValidator(estimator=pipeline2, ...)
crossval.fit(train)

CPU times: user 2.06 s, sys: 504 ms, total: 2.56 s / Wall time: 3min

Upvotes: 0

Related Questions