Reputation: 322
I have two DataFrames
with the same columns and I want to convert
a categorical column into a vector using One-Hot-Encoding. The problem is
that for example, in the training set 3 unique values may occur while in
the test set you may have less than that.
Training Set: Test Set:
+------------+ +------------+
| Type | | Type |
+------------+ +------------+
| 0 | | 0 |
| 1 | | 1 |
| 1 | | 1 |
| 3 | | 1 |
+------------+ +------------+
In this case the OneHotEncoder
creates vectors with different length on the training and test set (since each element of the vector represents the presence of a unique value).
Is it possible to use the same OneHotEncoder
on multiple DataFrames
?
There is no fit
function and so I don't know how I could do that.
Thanks.
Upvotes: 1
Views: 1774
Reputation: 330353
Spark >= 3.0:
Old-style OneHotEncoder
has been removed and OneHotEncoderEstimator
has been renamed to OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel
encoder = (OneHotEncoder()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
Spark >= 2.3:
Spark 2.3 add new OneHotEncoderEstimator
and OneHotEncoderModel
classes which work as you expect them to work here.
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
encoder = (OneHotEncoderEstimator()
.setInputCols(["type"])
.setOutputCols(["encoded"])
.setDropLast(False))
model = encoder.fit(training) # type: OneHotEncoderModel
model.transform(training).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 3.0|(4,[3],[1.0])|
# +----+-------------+
model.transform(testing).show()
# +----+-------------+
# |type| encoded|
# +----+-------------+
# | 0.0|(4,[0],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# | 1.0|(4,[1],[1.0])|
# +----+-------------+
Spark < 2.3
OneHotEncoder
is not intended to be used alone. Instead it should be a part of a Pipeline
where it can leverage column metadata. Consider following example:
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(["type"])
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(["type"])
When you use encoder directly it has no knowledge about the context:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setOutputCol("encoded").setDropLast(False)
encoder.setInputCol("type").transform(training).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(4,[0],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 1.0|(4,[1],[1.0])|
## | 3.0|(4,[3],[1.0])|
## +----+-------------+
encoder.setInputCol("type").transform(testing).show()
## +----+-------------+
## |type| encoded|
## +----+-------------+
## | 0.0|(2,[0],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## | 1.0|(2,[1],[1.0])|
## +----+-------------+
Now lets add required metadata. It can be for example by using StringIndexer
:
indexer = (StringIndexer()
.setInputCol("type")
.setOutputCol("type_idx")
.fit(training))
If you apply encoder on the indexed column you'll get consistent encoding on both data sets:
(encoder.setInputCol("type_idx")
.transform(indexer.transform(training))
.show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 3.0| 2.0|(3,[2],[1.0])|
## +----+--------+-------------+
(encoder .setInputCol("type_idx") .transform(indexer.transform(testing)) .show())
## +----+--------+-------------+
## |type|type_idx| encoded|
## +----+--------+-------------+
## | 0.0| 1.0|(3,[1],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## | 1.0| 0.0|(3,[0],[1.0])|
## +----+--------+-------------+
Please note that the labels you get this way don't reflect values in the input data. If consistent encoding is a hard requirement you should provide schema manually:
from pyspark.sql.types import StructType, StructField, DoubleType
meta = {"ml_attr": {
"name": "type",
"type": "nominal",
"vals": ["0.0", "1.0", "3.0"]
}}
schema = StructType([StructField("type", DoubleType(), False, meta)])
training = sc.parallelize([(0., ), (1., ), (1., ), (3., )]).toDF(schema)
testing = sc.parallelize([(0., ), (1., ), (1., ), (1., )]).toDF(schema)
assert (
encoder.setInputCol("type").transform(training).first()[-1].size ==
encoder.setInputCol("type").transform(testing).first()[-1].size
)
Upvotes: 4
Reputation: 933
We can extend this to multiple columnar dataset by creating a meta matrix and creating multiple OneHotEncoders. These steps can be staged in the pipeline.
Upvotes: -1