Moris Huxley
Moris Huxley

Reputation: 388

How to encode labels from array in pyspark

For example I have DataFrame with categorical features in name:

 from pyspark.sql import SparkSession

 spark = SparkSession.builder.master("local").appName("example")
 .config("spark.some.config.option", "some-value").getOrCreate()

 features = [(['a', 'b', 'c'], 1),
             (['a', 'c'], 2),
             (['d'], 3),
             (['b', 'c'], 4), 
             (['a', 'b', 'd'], 5)]

 df = spark.createDataFrame(features, ['name','id'])
 df.show()

Out:

+---------+----+
|     name| id |
+---------+----+
|[a, b, c]|   1|
|   [a, c]|   2|
|      [d]|   3|
|   [b, c]|   4|
|[a, b, d]|   5|
+---------+----+

What I want to get:

+--------+--------+--------+--------+----+
| name_a | name_b | name_c | name_d | id |
+--------+--------+--------+--------+----+
| 1      | 1      | 1      | 0      | 1  |
+--------+--------+--------+--------+----+
| 1      | 0      | 1      | 0      | 2  |
+--------+--------+--------+--------+----+
| 0      | 0      | 0      | 1      | 3  |
+--------+--------+--------+--------+----+
| 0      | 1      | 1      | 0      | 4  |
+--------+--------+--------+--------+----+
| 1      | 1      | 0      | 1      | 5  |
+--------+--------+--------+--------+----+

I found the same queston but there is nothing helpful. I tried to use VectorIndexer from PySpark.ML but I faced some problems with a transform of name field to vector type.

 from pyspark.ml.feature import VectorIndexer

 indexer = VectorIndexer(inputCol="name", outputCol="indexed", maxCategories=5)
 indexerModel = indexer.fit(df)

I get the following error:

Column name must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType

I found a solution here but it looks overcomplicated. However, I'm not sure if it can be done only with VectorIndexer.

Upvotes: 4

Views: 2930

Answers (2)

user2314737
user2314737

Reputation: 29347

With explode from the pyspark.sql.functions and pivot:

from pyspark.sql import functions as F
features = [(['a', 'b', 'c'], 1),
             (['a', 'c'], 2),
             (['d'], 3),
             (['b', 'c'], 4),
             (['a', 'b', 'd'], 5)]
df = spark.createDataFrame(features, ['name','id'])
df.show()
+---------+---+
|     name| id|
+---------+---+
|[a, b, c]|  1|
|   [a, c]|  2|
|      [d]|  3|
|   [b, c]|  4|
|[a, b, d]|  5|
+---------+---+

df = df.withColumn('exploded', F.explode('name'))

df.drop('name').groupby('id').pivot('exploded').count().show()
+---+----+----+----+----+
| id|   a|   b|   c|   d|
+---+----+----+----+----+
|  5|   1|   1|null|   1|
|  1|   1|   1|   1|null|
|  3|null|null|null|   1|
|  2|   1|null|   1|null|
|  4|null|   1|   1|null|
+---+----+----+----+----+

Sort by id and convert null to 0

df.drop('name').groupby('id').pivot('exploded').count().na.fill(0).sort(F.col('id').asc()).show()
+---+---+---+---+---+
| id|  a|  b|  c|  d|
+---+---+---+---+---+
|  1|  1|  1|  1|  0|
|  2|  1|  0|  1|  0|
|  3|  0|  0|  0|  1|
|  4|  0|  1|  1|  0|
|  5|  1|  1|  0|  1|
+---+---+---+---+---+

explode returns a new row for each element in the given array or map. You can then use pivot to "transpose" the new column.

Upvotes: 2

10465355
10465355

Reputation: 4631

If you want use the output with Spark ML it is best to use CountVectorizer:

from pyspark.ml.feature import CountVectorizer

# Add binary=True if needed
df_enc = (CountVectorizer(inputCol="name", outputCol="name_vector")
    .fit(df)
    .transform(df))
df_enc.show(truncate=False)
+---------+---+-------------------------+
|name     |id |name_vector              |
+---------+---+-------------------------+
|[a, b, c]|1  |(4,[0,1,2],[1.0,1.0,1.0])|
|[a, c]   |2  |(4,[0,1],[1.0,1.0])      |
|[d]      |3  |(4,[3],[1.0])            |
|[b, c]   |4  |(4,[1,2],[1.0,1.0])      |
|[a, b, d]|5  |(4,[0,2,3],[1.0,1.0,1.0])|
+---------+---+-------------------------+

Otherwise collect distinct values:

from pyspark.sql.functions import array_contains, col, explode

names = [
    x[0] for x in 
    df.select(explode("name").alias("name")).distinct().orderBy("name").collect()]

and select the columns with array_contains:

df_sep = df.select("*", *[
    array_contains("name", name).alias("name_{}".format(name)).cast("integer") 
    for name in names]
)
df_sep.show()
+---------+---+------+------+------+------+
|     name| id|name_a|name_b|name_c|name_d|
+---------+---+------+------+------+------+
|[a, b, c]|  1|     1|     1|     1|     0|
|   [a, c]|  2|     1|     0|     1|     0|
|      [d]|  3|     0|     0|     0|     1|
|   [b, c]|  4|     0|     1|     1|     0|
|[a, b, d]|  5|     1|     1|     0|     1|
+---------+---+------+------+------+------+

Upvotes: 4

Related Questions