Tim
Tim

Reputation: 3407

Pyspark Pipeline Performance

Is there any performance difference between using 2 separate pipelines vs 1 combined pipeline?

For example, 2 separate pipelines:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
df = pipeline2.fit(df).transform(df)

1 combined pipeline:

pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

df = pipeline.fit(df).transform(df)

Upvotes: 0

Views: 489

Answers (1)

JAdel
JAdel

Reputation: 1616

I transformed the Dataframe to a rdd and then looked into the Job. It shows that 2 separate pipelines need more steps to fullfill the job then one pipeline. See below:

2 separate pipelines:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])
df = pipeline1.fit(df).transform(df)
print(df.rdd.toDebugString())

df = pipeline2.fit(df).transform(df)
print(df.rdd.toDebugString())
b'(8) MapPartitionsRDD[124] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[123] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[122] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[121] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[117] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'
b'(8) MapPartitionsRDD[128] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[127] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[126] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[125] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[120] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[119] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[118] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[117] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[116] at readRDDFromFile at PythonRDD.scala:274 []'

1 pipeline:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1"),
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

df2 = pipeline.fit(df).transform(df)
print(df2.rdd.toDebugString())
b'(8) MapPartitionsRDD[79] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[77] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[76] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[75] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[74] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[73] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[72] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[71] at readRDDFromFile at PythonRDD.scala:274 []'

I hope I could answer your question.

Upvotes: 1

Related Questions