Gilad
Gilad

Reputation: 618

PySpark DataFrame - Append Random Permutation of a Single Column

I'm using PySpark (a new thing for me). Now, suppose I Have the following table: +-------+-------+----------+ | Col1 | Col2 | Question | +-------+-------+----------+ | val11 | val12 | q1 | | val21 | val22 | q2 | | val31 | val32 | q3 | +-------+-------+----------+ and I would like to append to it a new column, random_qustion which is in fact a permutation of the values in the Question column, so the result might look like this: +-------+-------+----------+-----------------+ | Col1 | Col2 | Question | random_question | +-------+-------+----------+-----------------+ | val11 | val12 | q1 | q2 | | val21 | val22 | q2 | q3 | | val31 | val32 | q3 | q1 | +-------+-------+----------+-----------------+ I'v tried to do that as follow: python df.withColumn( 'random_question' ,df.orderBy(rand(seed=0))['question'] ).createOrReplaceTempView('with_random_questions') The problem is that the above code does append the required column but WITHOUT permuting the values in it.

What am I doing wrong and how can I fix this?

Thank you,

Gilad

Upvotes: 2

Views: 643

Answers (2)

figs_and_nuts
figs_and_nuts

Reputation: 5753

The above answer is wrong. You are not guaranteed to have the same set of ids in the two dataframes and you will lose rows.

df = spark.createDataFrame(pd.DataFrame({'a':[1,2,3,4],'b':[10,11,12,13],'c':[100,101,102,103]}))
questions = df.select(F.col('a').alias('random_question'))
random = questions.orderBy(F.rand())
df = df.withColumn('row_id', F.monotonically_increasing_id())
random = random.withColumn('row_id', F.monotonically_increasing_id())
df.show()
random.show()

output on my system:

+---+---+---+-----------+
|  a|  b|  c|     row_id|
+---+---+---+-----------+
|  1| 10|100| 8589934592|
|  2| 11|101|25769803776|
|  3| 12|102|42949672960|
|  4| 13|103|60129542144|
+---+---+---+-----------+

+---------------+-----------+
|random_question|     row_id|
+---------------+-----------+
|              4|          0|
|              1| 8589934592|
|              2|17179869184|
|              3|25769803776|
+---------------+-----------+

Use the following utility to add permuted columns in place of the original columns or as new columns

from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import rand, col
from pyspark.sql import Row
def permute_col_maintain_corr_join(df, colnames,  newnames=[], replace = False):
    '''
    colname: list of columns to be permuted
    newname: list of new names for the permuted columns
    replace: whether to add permuted columns as new columns or replace the original columne
    '''
    def flattener(rdd_1):
        r1 = rdd_1[0].asDict()
        idx = rdd_1[1]
        combined_dict = {**r1,**{'index':idx}}
        out_row = Row(**combined_dict)
        return out_row
    def compute_schema_wid(df):
        dfs = df.schema.fields
        ids = StructField('index', IntegerType(), False)
        return StructType(dfs+[ids])
    if not newnames:
        newnames = [f'{i}_sha' for i in colnames]
    assert len(colnames) == len(newnames)
    if not replace:
        assert not len(set(df.columns).intersection(set(newnames))), 'with replace False newnames cannot contain a column name from df'
    else:
        _rc = set(df.columns) - set(colnames)
        assert not len(_rc.intersection(set(newnames))), 'with replace True newnames cannot contain a column name from df other than one from colnames'
    df_ts = df.select(*colnames).toDF(*newnames)
    if replace:
        df = df.drop(*colnames)
    df_ts = df_ts.orderBy(rand())
    df_ts_s = compute_schema_wid(df_ts)
    df_s = compute_schema_wid(df)
    df_ts = df_ts.rdd.zipWithUniqueId().map(flattener).toDF(schema=df_ts_s)
    df = df.rdd.zipWithUniqueId().map(flattener).toDF(schema=df_s)
    df = df.join(df_ts,on='index').drop('index')
    return df

Upvotes: 0

Sequinex
Sequinex

Reputation: 671

This should do the trick:

import pyspark.sql.functions as F

questions = df.select(F.col('Question').alias('random_question'))
random = questions.orderBy(F.rand())

Give the dataframes a unique row id:

df = df.withColumn('row_id', F.monotonically_increasing_id())
random = random.withColumn('row_id', F.monotonically_increasing_id())

Join them by row id:

final_df = df.join(random, 'row_id')

Upvotes: 3

Related Questions