Ali Hassan
Ali Hassan

Reputation: 339

Flatten the nested dataframe in pyspark into column

Hi I have JSON data which I am pulling in pyspark the sample is below.

{
    "data": [
        ["row-r9pv-p86t.ifsp", "00000000-0000-0000-0838-60C2FFCC43AE", 0, 1574264158, null, 1574264158, null, "{ }", "2007", "ZOEY", "KINGS", "F", "11"],
        ["row-7v2v~88z5-44se", "00000000-0000-0000-C8FC-DDD3F9A72DFF", 0, 1574264158, null, 1574264158, null, "{ }", "2007", "ZOEY", "SUFFOLK", "F", "6"],
        ["row-hzc9-4kvv~mbc9", "00000000-0000-0000-562E-D9A0792557FC", 0, 1574264158, null, 1574264158, null, "{ }", "2007", "ZOEY", "MONROE", "F", "6"]
    ]
}

I am trying to explode the multi array and break each record into single row of dataframe but it appears something like this:

df= spark.read.json('data/rows.json', multiLine=True)
temp_df = df.select(explode("data").alias("data"))
temp_df.show(n=3, truncate=False)

Result:

+-----------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                   |
+-----------------------------------------------------------------------------------------------------------------------+
|[row-r9pv-p86t.ifsp, 00000000-0000-0000-0838-60C2FFCC43AE, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, KINGS, F, 11] |
|[row-7v2v~88z5-44se, 00000000-0000-0000-C8FC-DDD3F9A72DFF, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, SUFFOLK, F, 6]|
|[row-hzc9-4kvv~mbc9, 00000000-0000-0000-562E-D9A0792557FC, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, MONROE, F, 6] |
+-----------------------------------------------------------------------------------------------------------------------+
temp_df.printSchema()
temp_df.show(5)
temp_df.select(flatten(temp_df.data)).show(n=10)

So far so good but when i try to flatten the array in each row of data frame using the flatten method it gives me the error saying cannot resolve 'flatten('data')' due to data type mismatch: The argument should be an array of arrays, but 'data' is of array<string> type. which makes sense but I am not sure how can we flatten the array.

Should I write any custom map method to map the row array to the data frame column?

HERE IS THE SCHEMA OF DATAFRAME

Upvotes: 1

Views: 873

Answers (2)

Ali Hassan
Ali Hassan

Reputation: 339

Answering my own question. so it could help anyone who needs.

  1. Reading the source data from the file
df= spark.read.json('data/rows.json', multiLine=True)
temp_df = df.select(explode("data").alias("data"))
temp_df.show(n=3, truncate=False)

Results:

+-----------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                   |
+-----------------------------------------------------------------------------------------------------------------------+
|[row-r9pv-p86t.ifsp, 00000000-0000-0000-0838-60C2FFCC43AE, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, KINGS, F, 11] |
|[row-7v2v~88z5-44se, 00000000-0000-0000-C8FC-DDD3F9A72DFF, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, SUFFOLK, F, 6]|
|[row-hzc9-4kvv~mbc9, 00000000-0000-0000-562E-D9A0792557FC, 0, 1574264158,, 1574264158,, { }, 2007, ZOEY, MONROE, F, 6] |
+-----------------------------------------------------------------------------------------------------------------------+

In the above data frame, each cell contains an array of string, and what I need were each element in a separate column and specific data type.

df = temp_df.withColumn("sid", temp_df["data"].getItem(0).cast(StringType())) \
       .withColumn("id", temp_df["data"].getItem(1).cast(IntegerType())) \
       .withColumn("position", temp_df["data"].getItem(2).cast(IntegerType())) \
       .withColumn("created_at", temp_df["data"].getItem(3).cast(TimestampType())) \
       .withColumn("created_meta", temp_df["data"].getItem(4).cast(StringType())) \
       .withColumn("updated_at", temp_df["data"].getItem(5).cast(TimestampType())) \
       .withColumn("updated_meta", temp_df["data"].getItem(6).cast(StringType())) \
       .withColumn("meta", temp_df["data"].getItem(7).cast(StringType())) \
       .withColumn("Year", (temp_df["data"].getItem(8)).cast(IntegerType())) \
       .withColumn("First Name", temp_df["data"].getItem(9).cast(StringType())) \
       .withColumn("County", temp_df["data"].getItem(10).cast(StringType())) \
       .withColumn("Sex", temp_df["data"].getItem(11).cast(StringType())) \
       .withColumn("Count", temp_df["data"].getItem(12).cast(IntegerType())) \
       .drop("data")
df.show()
df.printSchema()
+------------------+----+--------+----------+------------+----------+------------+----+----+----------+-------+---+-----+
|               sid|  id|position|created_at|created_meta|updated_at|updated_meta|meta|Year|First Name| County|Sex|Count|
+------------------+----+--------+----------+------------+----------+------------+----+----+----------+-------+---+-----+
|row-r9pv-p86t.ifsp|null|       0|      null|        null|      null|        null| { }|2007|      ZOEY|  KINGS|  F|   11|
|row-7v2v~88z5-44se|null|       0|      null|        null|      null|        null| { }|2007|      ZOEY|SUFFOLK|  F|    6|
|row-hzc9-4kvv~mbc9|null|       0|      null|        null|      null|        null| { }|2007|      ZOEY| MONROE|  F|    6|
+------------------+----+--------+----------+------------+----------+------------+----+----+----------+-------+---+-----+

==================== SCHEMA ====================

root
 |-- sid: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- created_meta: string (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- updated_meta: string (nullable = true)
 |-- meta: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- First Name: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Count: integer (nullable = true)

Upvotes: 1

mvasyliv
mvasyliv

Reputation: 1214

val resDF = temp_df.select(
  'data.getItem(0).alias("c0"),
  'data.getItem(1).alias("c1"),
  'data.getItem(2).alias("c2"),
  'data.getItem(3).alias("c3")
  // ...
)
resDF.show(false)
//  +------------------+------------------------------------+---+----------+
//  |c0                |c1                                  |c2 |c3        |
//  +------------------+------------------------------------+---+----------+
//  |row-r9pv-p86t.ifsp|00000000-0000-0000-0838-60C2FFCC43AE|0  |1574264158|
//  |row-7v2v~88z5-44se|00000000-0000-0000-C8FC-DDD3F9A72DFF|0  |1574264158|
//  |row-hzc9-4kvv~mbc9|00000000-0000-0000-562E-D9A0792557FC|0  |1574264158|
//  +------------------+------------------------------------+---+----------+

V 2 (use WithColumn and concat_ws):

 val sourceDF = Seq(
    Array("row-r9pv-p86t.ifsp", "00000000-0000-0000-0838-60C2FFCC43AE", "0", "1574264158", "", "1574264158", "", "{ }", "2007", "ZOEY", "KINGS", "F", "11"),
    Array("row-7v2v~88z5-44se", "00000000-0000-0000-C8FC-DDD3F9A72DFF", "0", "1574264158", "", "1574264158", "", "{ }", "2007", "ZOEY", "SUFFOLK", "F", "6"),
    Array("row-hzc9-4kvv~mbc9", "00000000-0000-0000-562E-D9A0792557FC", "0", "1574264158", "", "1574264158", "", "{ }", "2007", "ZOEY", "MONROE", "F", "6")
  ).toDF("dataColumn")

  sourceDF.show(false)

//  +-------------------------------------------------------------------------------------------------------------------------+
//  |dataColumn                                                                                                               |
//  +-------------------------------------------------------------------------------------------------------------------------+
//  |[row-r9pv-p86t.ifsp, 00000000-0000-0000-0838-60C2FFCC43AE, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, KINGS, F, 11] |
//  |[row-7v2v~88z5-44se, 00000000-0000-0000-C8FC-DDD3F9A72DFF, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, SUFFOLK, F, 6]|
//  |[row-hzc9-4kvv~mbc9, 00000000-0000-0000-562E-D9A0792557FC, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, MONROE, F, 6] |
//  +-------------------------------------------------------------------------------------------------------------------------+


  val df1 = sourceDF
    .withColumn("dataString", concat_ws(", ", 'dataColumn))
    .select('dataString)

  df1.printSchema()

  df1.show(false)
//  root
//  |-- dataString: string (nullable = false)
//
//  +-----------------------------------------------------------------------------------------------------------------------+
//  |dataString                                                                                                             |
//  +-----------------------------------------------------------------------------------------------------------------------+
//  |row-r9pv-p86t.ifsp, 00000000-0000-0000-0838-60C2FFCC43AE, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, KINGS, F, 11 |
//  |row-7v2v~88z5-44se, 00000000-0000-0000-C8FC-DDD3F9A72DFF, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, SUFFOLK, F, 6|
//  |row-hzc9-4kvv~mbc9, 00000000-0000-0000-562E-D9A0792557FC, 0, 1574264158, , 1574264158, , { }, 2007, ZOEY, MONROE, F, 6 |
//  +-----------------------------------------------------------------------------------------------------------------------+

  val df2 = df1.select(
    split('dataString, ", ").getItem(0).alias("c0"),
    split('dataString, ", ").getItem(1).alias("c1"),
    split('dataString, ", ").getItem(2).alias("c2"),
    split('dataString, ", ").getItem(3).alias("c3"),
    split('dataString, ", ").getItem(4).alias("c4"),
    split('dataString, ", ").getItem(5).alias("c5"),
    split('dataString, ", ").getItem(6).alias("c6"),
    split('dataString, ", ").getItem(7).alias("c7"),
    split('dataString, ", ").getItem(8).alias("c8"),
    split('dataString, ", ").getItem(9).alias("c9"),
    split('dataString, ", ").getItem(10).alias("c10"),
    split('dataString, ", ").getItem(11).alias("c11"),
    split('dataString, ", ").getItem(12).alias("c12")
  )
  df2.printSchema()
//  root
//  |-- c0: string (nullable = true)
//  |-- c1: string (nullable = true)
//  |-- c2: string (nullable = true)
//  |-- c3: string (nullable = true)
//  |-- c4: string (nullable = true)
//  |-- c5: string (nullable = true)
//  |-- c6: string (nullable = true)
//  |-- c7: string (nullable = true)
//  |-- c8: string (nullable = true)
//  |-- c9: string (nullable = true)
//  |-- c10: string (nullable = true)
//  |-- c11: string (nullable = true)
//  |-- c12: string (nullable = true)

  df2.show(false)
//  +------------------+------------------------------------+---+----------+---+----------+---+---+----+----+-------+---+---+
//  |c0                |c1                                  |c2 |c3        |c4 |c5        |c6 |c7 |c8  |c9  |c10    |c11|c12|
//  +------------------+------------------------------------+---+----------+---+----------+---+---+----+----+-------+---+---+
//  |row-r9pv-p86t.ifsp|00000000-0000-0000-0838-60C2FFCC43AE|0  |1574264158|   |1574264158|   |{ }|2007|ZOEY|KINGS  |F  |11 |
//  |row-7v2v~88z5-44se|00000000-0000-0000-C8FC-DDD3F9A72DFF|0  |1574264158|   |1574264158|   |{ }|2007|ZOEY|SUFFOLK|F  |6  |
//  |row-hzc9-4kvv~mbc9|00000000-0000-0000-562E-D9A0792557FC|0  |1574264158|   |1574264158|   |{ }|2007|ZOEY|MONROE |F  |6  |
//  +------------------+------------------------------------+---+----------+---+----------+---+---+----+----+-------+---+---+

Upvotes: 0

Related Questions