Reputation: 1599
My question is relevant to my previous one: transform columns values to columns in pyspark dataframe
I have created a table "my_df" (a dataframe in pyspark):
+----+--------+---------------------------------+
|id |payment |shop |
+----+--------+---------------------------------+
|dapd|[credit, cash] |[retail, on-line] |
|wrfr|[cash, debit] |[supermarket, brand store]|
+----+--------+---------------------------------+
Now, I need to do clustering for the table such that I can find the similarity of the "id"s. I am trying k-means at first. So, I need to transform the categorical values to numerical values by one-hot encoding. I am referring How to handle categorical features with spark-ml?
my code:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
inputs, my_indx_list = [], []
for a_col in my_df.columns:
my_indx = StringIndexer(inputCol = a_col, outputCol = a_col + "_index")
inputs.append(my_indx.getOutputCol())
my_indx_list.append(my_indx)
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=[x + "_vector" for x in inputs])
a_pipeline = Pipeline(stages = my_indx_list + [encoder])
pipeline.fit(my_df).transform(my_df).show() # error here !
But, I got error:
A column must be either string type or numeric type, but got ArrayType(StringType,true)
So, how I can solve this?
My idea: sort the list value of each column, and concatenate each string in the list to a long string for each column.
But, for each column, the values are the answers for some survey questions and each answer has the same weight. I am not sure how to work it out ?
UPDATE
Based on the proposed solution, it works but it is very slow. It took about 3.5 hours on a cluster with 300 GB memory and 32 cores.
my code:
from pyspark.ml.feature import CountVectorizer
tmp_df = original_df # 3.5 million rows and 300 columns
for a_col in original_df.columns:
a_vec = CountVectorizer(inputCol = a_col, outputCol = a_col + "_index", binary=True)
tmp_df = a_vec.fit(tmp_df).transform(tmp_df)
tmp_df.show()
The "original_df" has 3.5 million rows and 300 columns.
How can I speed up ?
thanks
Upvotes: 3
Views: 1898
Reputation: 542
@jxc suggested the brilliant use of CountVectorizer
for one-hot encoding in your case, which is usually used for counting tokens in natural language processing.
Using CountVectorizer
saves you troubles in dealing with explode
and collect_set
with OneHotEncoderEstimator
; or worse if you try to implement it using udf
.
Given this dataframe,
df = spark.createDataFrame([
{'id': 'dapd', 'payment': ['credit', 'cash'], 'shop': ['retail', 'on-line']},
{'id': 'wrfr', 'payment': ['cash', 'debit'], 'shop': ['supermarket', 'brand store']}
])
df.show()
+----+--------------+--------------------+
| id| payment| shop|
+----+--------------+--------------------+
|dapd|[credit, cash]| [retail, on-line]|
|wrfr| [cash, debit]|[supermarket, bra...|
+----+--------------+--------------------+
You can one-hot encode by treating the array of strings as tokens in natural language processing. Note the use of binary=True
to force it to return only 0 or 1.
from pyspark.ml.feature import CountVectorizer
payment_cv = CountVectorizer(inputCol="payment", outputCol="paymentEnc", binary=True)
first_res_df = payment_cv.fit(df).transform(df)
shop_cv = CountVectorizer(inputCol="shop", outputCol="shopEnc", binary=True)
final_res_df = shop_cv.fit(first_res_df).transform(first_res_df)
final_res_df.show()
+----+--------------+--------------------+-------------------+-------------------+
| id| payment| shop| paymentEnc| shopEnc|
+----+--------------+--------------------+-------------------+-------------------+
|dapd|[credit, cash]| [retail, on-line]|(3,[0,2],[1.0,1.0])|(4,[0,3],[1.0,1.0])|
|wrfr| [cash, debit]|[supermarket, bra...|(3,[0,1],[1.0,1.0])|(4,[1,2],[1.0,1.0])|
+----+--------------+--------------------+-------------------+-------------------+
Upvotes: 3