Mohd Zoubi
Mohd Zoubi

Reputation: 186

Explode multiple columns in Spark SQL table

There was a question regarding this issue here:

Explode (transpose?) multiple columns in Spark SQL table

Suppose that we have extra columns as below:

**userId    someString      varA     varB      varC    varD**
   1        "example1"    [0,2,5]   [1,2,9]    [a,b,c] [red,green,yellow]
   2        "example2"    [1,20,5]  [9,null,6] [d,e,f] [white,black,cyan]

To conclude an output like below:

userId    someString      varA     varB   varC     varD
   1      "example1"       0         1     a       red
   1      "example1"       2         2     b       green
   1      "example1"       5         9     c       yellow
   2      "example2"       1         9     d       white
   2      "example2"       20       null   e       black
   2      "example2"       5         6     f       Cyan

The answer was by defining a udf as:

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

and defining "withColumn".

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

If we need to extend the above answer, with more columns, what is the easiest way to amend the above code. Any help please.

Upvotes: 6

Views: 7180

Answers (3)

Vandana Bachu
Vandana Bachu

Reputation: 1

If you want to extend the UDF for more columns, do as below:

val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) =>
  for (((xs,ys),zs) <- xs zip ys zip zs) yield (xs,ys,zs))

df.withColumn("vars", explode(zip($"varA", $"varB", $"varC"))).select(
  $"userId", $"someString", $"vars._1".alias("varA"),
  $"vars._2".alias("varB"),$"vars._3".alias("varC")).show

This logic can be applied for n columns as required.

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27373

The approach with the zip udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:

def assertSameSize(arrs:Seq[_]*) = {
 assert(arrs.map(_.size).distinct.size==1,"sizes differ") 
}

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
    assertSameSize(xa,xb,xc,xd)
    xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
  }
)

Upvotes: 3

Ankush Singh
Ankush Singh

Reputation: 560

I am assuming that the size of varA,varB,varC,varD remains same from your example.

scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input

scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result

scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)

scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)

scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]

scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString|      varA|        varB|     varC|                varD|
+-------+----------+----------+------------+---------+--------------------+
|      1|  example1| [0, 2, 5]|   [1, 2, 9]|[a, b, c]|[red, green, yellow]|
|      2|  example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+

scala> def getResult(row : Input) : Iterable[Result] = {
     |             val user_id = row.user_id
     |             val someString = row.someString
     |             val varA = row.varA
     |             val varB = row.varB
     |             val varC = row.varC
     |             val varD = row.varD
     |             val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
     |             seq.toSeq
     |         }
getResult: (row: Input)Iterable[Result]

scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]

scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC|  varD|
+-------+----------+----+----+----+------+
|      1|  example1|   0|   1|   a|   red|
|      1|  example1|   2|   2|   b| green|
|      1|  example1|   5|   9|   c|yellow|
|      2|  example2|   1|   9|   d| white|
|      2|  example2|  20|null|   e| black|
|      2|  example2|   5|   6|   f|  cyan|
+-------+----------+----+----+----+------+

If the size of columns varA,varB,varC or varD is different then those scenarios need to be handles.

You could iterate over the max size and output null values if values are not present in any columns by handling exceptions.

Upvotes: 3

Related Questions