Reputation: 618
I'm using PySpark (a new thing for me). Now, suppose I Have the following table:
+-------+-------+----------+
| Col1 | Col2 | Question |
+-------+-------+----------+
| val11 | val12 | q1 |
| val21 | val22 | q2 |
| val31 | val32 | q3 |
+-------+-------+----------+
and I would like to append to it a new column, random_qustion
which is in fact a permutation of the values in the Question
column, so the result might look like this:
+-------+-------+----------+-----------------+
| Col1 | Col2 | Question | random_question |
+-------+-------+----------+-----------------+
| val11 | val12 | q1 | q2 |
| val21 | val22 | q2 | q3 |
| val31 | val32 | q3 | q1 |
+-------+-------+----------+-----------------+
I'v tried to do that as follow:
python
df.withColumn(
'random_question'
,df.orderBy(rand(seed=0))['question']
).createOrReplaceTempView('with_random_questions')
The problem is that the above code does append the required column but WITHOUT permuting the values in it.
What am I doing wrong and how can I fix this?
Thank you,
Gilad
Upvotes: 2
Views: 643
Reputation: 5753
The above answer is wrong. You are not guaranteed to have the same set of ids in the two dataframes and you will lose rows.
df = spark.createDataFrame(pd.DataFrame({'a':[1,2,3,4],'b':[10,11,12,13],'c':[100,101,102,103]}))
questions = df.select(F.col('a').alias('random_question'))
random = questions.orderBy(F.rand())
df = df.withColumn('row_id', F.monotonically_increasing_id())
random = random.withColumn('row_id', F.monotonically_increasing_id())
df.show()
random.show()
output on my system:
+---+---+---+-----------+
| a| b| c| row_id|
+---+---+---+-----------+
| 1| 10|100| 8589934592|
| 2| 11|101|25769803776|
| 3| 12|102|42949672960|
| 4| 13|103|60129542144|
+---+---+---+-----------+
+---------------+-----------+
|random_question| row_id|
+---------------+-----------+
| 4| 0|
| 1| 8589934592|
| 2|17179869184|
| 3|25769803776|
+---------------+-----------+
Use the following utility to add permuted columns in place of the original columns or as new columns
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import rand, col
from pyspark.sql import Row
def permute_col_maintain_corr_join(df, colnames, newnames=[], replace = False):
'''
colname: list of columns to be permuted
newname: list of new names for the permuted columns
replace: whether to add permuted columns as new columns or replace the original columne
'''
def flattener(rdd_1):
r1 = rdd_1[0].asDict()
idx = rdd_1[1]
combined_dict = {**r1,**{'index':idx}}
out_row = Row(**combined_dict)
return out_row
def compute_schema_wid(df):
dfs = df.schema.fields
ids = StructField('index', IntegerType(), False)
return StructType(dfs+[ids])
if not newnames:
newnames = [f'{i}_sha' for i in colnames]
assert len(colnames) == len(newnames)
if not replace:
assert not len(set(df.columns).intersection(set(newnames))), 'with replace False newnames cannot contain a column name from df'
else:
_rc = set(df.columns) - set(colnames)
assert not len(_rc.intersection(set(newnames))), 'with replace True newnames cannot contain a column name from df other than one from colnames'
df_ts = df.select(*colnames).toDF(*newnames)
if replace:
df = df.drop(*colnames)
df_ts = df_ts.orderBy(rand())
df_ts_s = compute_schema_wid(df_ts)
df_s = compute_schema_wid(df)
df_ts = df_ts.rdd.zipWithUniqueId().map(flattener).toDF(schema=df_ts_s)
df = df.rdd.zipWithUniqueId().map(flattener).toDF(schema=df_s)
df = df.join(df_ts,on='index').drop('index')
return df
Upvotes: 0
Reputation: 671
This should do the trick:
import pyspark.sql.functions as F
questions = df.select(F.col('Question').alias('random_question'))
random = questions.orderBy(F.rand())
Give the dataframes a unique row id:
df = df.withColumn('row_id', F.monotonically_increasing_id())
random = random.withColumn('row_id', F.monotonically_increasing_id())
Join them by row id:
final_df = df.join(random, 'row_id')
Upvotes: 3