Reputation: 3407
Is there any performance difference between using 2 separate pipelines vs 1 combined pipeline?
For example, 2 separate pipelines:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
df = pipeline2.fit(df).transform(df)
1 combined pipeline:
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline.fit(df).transform(df)
Upvotes: 0
Views: 489
Reputation: 1616
I transformed the Dataframe to a rdd and then looked into the Job. It shows that 2 separate pipelines need more steps to fullfill the job then one pipeline. See below:
2 separate pipelines:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline1 = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])
pipeline2 = Pipeline(stages=[
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
print(df.rdd.toDebugString())
df = pipeline2.fit(df).transform(df)
print(df.rdd.toDebugString())
b'(8) MapPartitionsRDD[124] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[123] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[122] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
b'(8) MapPartitionsRDD[128] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[127] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[126] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[125] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[117] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
1 pipeline:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([
(1.0, 0, 1, 1, 0),
(0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))
pipeline = Pipeline(stages=[
VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df2 = pipeline.fit(df).transform(df)
print(df2.rdd.toDebugString())
b'(8) MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[77] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[76] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[75] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[74] at map at SerDeUtil.scala:69 []\n | MapPartitionsRDD[73] at mapPartitions at SerDeUtil.scala:117 []\n | PythonRDD[72] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274 []'
I hope I could answer your question.
Upvotes: 1