Reputation: 43
I am experienced in python but totally new to pyspark. I have dataframe that contains about 50M rows, with several categorical features. For each feature, I have One-Hot Encoded them. Here's a simplified but representative example of the code.
from pyspark.ml.feature import StringIndexer, OneHotEncoder from pyspark.ml import Pipeline
df = sc.parallelize([
(1, 'grocery'),
(1, 'drinks'),
(1, 'bakery'),
(2, 'grocery'),
(3, 'bakery'),
(3, 'bakery'),
]).toDF(["id", "category"])
indexer = StringIndexer(inputCol='category', outputCol='categoryIndex')
encoder = OneHotEncoder(inputCol='categoryIndex', outputCol='categoryVec')
pipe = Pipeline(stages = [indexer, encoder])
newDF = pipe.fit(df).transform(df)
Giving output
+---+--------+-------------+-------------+
| id|category|categoryIndex| categoryVec|
+---+--------+-------------+-------------+
| 1| grocery| 1.0|(2,[1],[1.0])|
| 1| drinks| 2.0| (2,[],[])|
| 1| bakery| 0.0|(2,[0],[1.0])|
| 2| grocery| 1.0|(2,[1],[1.0])|
| 3| bakery| 0.0|(2,[0],[1.0])|
| 3| bakery| 0.0|(2,[0],[1.0])|
+---+--------+-------------+-------------+
I would now like to groupBy 'id' and aggregate the 'categoryVec' column with a sum, so I can get one row per id, with a vector that indicates which of the (possibly several) categories that customer was shopping in. In pandas this would be simply a case of applying sum/mean to each column produced in the pd.get_dummies()
step, but here it doesn't seem to be so simple.
I will then pass the output to ML algorithms so I will need to be able to use VectorAssembler or similar on the output.
Oh, and I really need a pyspark solution.
Many thanks for your help!
Upvotes: 4
Views: 2878
Reputation: 1497
You can use Counvectorizer for this. It converts category index array to encoded vector.
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql import functions as F
df = sc.parallelize([
(1, 'grocery'),
(1, 'drinks'),
(1, 'bakery'),
(2, 'grocery'),
(3, 'bakery'),
(3, 'bakery'),
]).toDF(["id", "category"]) \
.groupBy('id') \
.agg(F.collect_list('category').alias('categoryIndexes'))
cv = CountVectorizer(inputCol='categoryIndexes', outputCol='categoryVec')
transformed_df = cv.fit(df).transform(df)
transformed_df.show()
results:
+---+--------------------+--------------------+
| id| categoryIndexes| categoryVec|
+---+--------------------+--------------------+
| 1|[grocery, drinks,...|(3,[0,1,2],[1.0,1...|
| 3| [bakery, bakery]| (3,[0],[2.0])|
| 2| [grocery]| (3,[1],[1.0])|
+---+--------------------+--------------------+
Upvotes: 3