Josh
Josh

Reputation: 768

How can I take a dataframe containing lists of strings and create another dataframe from these lists in Pyspark?

Suppose I have a dataframe that looks like this

+--------------------+
|        ColA        |
+--------------------+
| [val1, val2, val3] |
+--------------------+
| [val4, val5, val6] |
+--------------------+
| [val7, val8, val9] |
+--------------------+

How can I create a new dataframe that would look like this?

+------+------+------+
| Col1 | Col2 | Col3 |
+------+------+------+
| val1 | val2 | val3 |
+------+------+------+
| val4 | val5 | val6 |
+------+------+------+
| val7 | val8 | val9 |
+------+------+------+

Upvotes: 0

Views: 265

Answers (2)

abiratsis
abiratsis

Reputation: 7316

Here are some options using either map through RDD API or a single select expression.

First let's create some sample data and extract the column names from an arbitrary row in the dataset. The precondition here is that all the items in the dataset must have the same length:

from pyspark.sql import Row

df = spark.createDataFrame(
[[["val1", "val2", "val3"]],
[["val4", "val5", "val6"]],
[["val7", "val8", "val9"]]], ["ColA"])

# get the len of the 1st item, the length should be the same for all the items in the dataset
ar_len = len(df.first()["ColA"])

# generate col names
col_names = ["col" + str(i + 1) for i in range(0, ar_len)]

col_names
# ['col1', 'col2', 'col3']

Option1: Map + Row

import pyspark.sql.functions as f

cols = [f.col('ColA').getItem(i).alias(c) for i,c in enumerate(col_names)]

def to_row(l):
  # set the columns of the Row
  r = Row(*cols)

  # set the values of the row that we defined above
  r = r(*l[0])
  return r

df.rdd.map(to_row).toDF().show()

You should first declare the cols list which should be the same size as the array item. Then with Row(*cols) create the desired schema of the Row. Finally, with r(*l[0]) we set the values of the previously created Row item.

Option2: Map + tuples

df.rdd.map(lambda l: (*l[0],)).toDF(col_names).show()

Here we simply unpack all the items of the list into a new tuple.

Option3: select statement

import pyspark.sql.functions as f

cols = [f.col('ColA').getItem(i).alias(c) for i,c in enumerate(col_names)]

df.select(*cols).show()

Output:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|val1|val2|val3|
|val4|val5|val6|
|val7|val8|val9|
+----+----+----+

Upvotes: 0

cph_sto
cph_sto

Reputation: 7585

This code is robust enough to take any number of elements in the arrays. Though OP has 3 elements in each array. We start by creating the said DataFrame.

# Loading requisite packages.
from pyspark.sql.functions import col, explode, first, udf
df = sqlContext.createDataFrame([(['val1', 'val2', 'val3'],),
                                 (['val4', 'val5', 'val6'],),
                                 (['val7', 'val8', 'val9'],)],['ColA',])
df.show()
+------------------+
|              ColA|
+------------------+
|[val1, val2, val3]|
|[val4, val5, val6]|
|[val7, val8, val9]|
+------------------+

Since we want to each element of the individual array be marked as a respective column, so as a first step we try to make a mapping between column name and the value. We create a user defined function - (UDF) to achieve this.

def func(c):
    return [['Col'+str(i+1),c[i]] for i in range(len(c))]
func_udf = udf(func,ArrayType(StructType([
      StructField('a', StringType()),
      StructField('b', StringType())
  ])))
df = df.withColumn('ColA_new',func_udf(col('ColA')))
df.show(truncate=False)
+------------------+---------------------------------------+
|ColA              |ColA_new                               |
+------------------+---------------------------------------+
|[val1, val2, val3]|[[Col1,val1], [Col2,val2], [Col3,val3]]|
|[val4, val5, val6]|[[Col1,val4], [Col2,val5], [Col3,val6]]|
|[val7, val8, val9]|[[Col1,val7], [Col2,val8], [Col3,val9]]|
+------------------+---------------------------------------+

Once this has been done, we explode the DataFrame.

# Step 1: Explode the DataFrame
df=df.withColumn('vals', explode('ColA_new')).drop('ColA_new')
df.show()
+------------------+-----------+
|              ColA|       vals|
+------------------+-----------+
|[val1, val2, val3]|[Col1,val1]|
|[val1, val2, val3]|[Col2,val2]|
|[val1, val2, val3]|[Col3,val3]|
|[val4, val5, val6]|[Col1,val4]|
|[val4, val5, val6]|[Col2,val5]|
|[val4, val5, val6]|[Col3,val6]|
|[val7, val8, val9]|[Col1,val7]|
|[val7, val8, val9]|[Col2,val8]|
|[val7, val8, val9]|[Col3,val9]|
+------------------+-----------+

Once exploded, we extract first and the second element, which were named a and b respectively in the UDF.

df=df.withColumn('column_name', col('vals').getItem('a'))
df=df.withColumn('value', col('vals').getItem('b')).drop('vals')
df.show()
+------------------+-----------+-----+
|              ColA|column_name|value|
+------------------+-----------+-----+
|[val1, val2, val3]|       Col1| val1|
|[val1, val2, val3]|       Col2| val2|
|[val1, val2, val3]|       Col3| val3|
|[val4, val5, val6]|       Col1| val4|
|[val4, val5, val6]|       Col2| val5|
|[val4, val5, val6]|       Col3| val6|
|[val7, val8, val9]|       Col1| val7|
|[val7, val8, val9]|       Col2| val8|
|[val7, val8, val9]|       Col3| val9|
+------------------+-----------+-----+

As a last step, we pivot the DataFrame back to obtain the final DataFrame. Since in pivoting we do aggregation, so we aggregate on the basis of first(), which takes the first element of the group.

# Step 2: Pivot it back.
df = df.groupby('ColA').pivot('column_name').agg(first('value')).drop('ColA')
df.show()
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|val1|val2|val3|
|val4|val5|val6|
|val7|val8|val9|
+----+----+----+

Upvotes: 1

Related Questions