Learnis
Learnis

Reputation: 566

Spark SQL Split or Extract words from String of Words

I have a spark Dataframe like Below.I'm trying to split the column into 2 more columns:

date   time    content

28may  11am    [ssid][customerid,shopid]
val personDF2 = personDF.withColumn("temp",split(col("content"),"\\[")).select(
  col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s/col$i)): _*)
date time   content                       col1   col2        col3

28may 11    [ssid][customerid,shopid]     ssid   customerid  shopid

Upvotes: 2

Views: 1835

Answers (2)

Ged
Ged

Reputation: 18003

Assuming a String to represent an Array of Words. Got your request. You can optimize the number of dataframes as well to reduce load on system. If there are more than 9 cols etc. you may need to use c00, c01, etc. for c10 etc. Or just use integer as name for columns. leave that up to you.

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

// Set up data
val df = spark.sparkContext.parallelize(Seq(
       ("A", "[foo][customerid,shopid][Donald,Trump,Esq][single]"),
       ("B", "[foo]")
     )).toDF("k", "v")

val df2 =  df.withColumn("words_temp",  regexp_replace($"v", lit("]"), lit("" )))
val df3 = df2.withColumn("words_temp2", regexp_replace($"words_temp", lit(","), lit("[" ))).drop("words_temp") 
val df4 = df3.withColumn("words_temp3", expr("substring(words_temp2, 2, length(words_temp2))")).withColumn("cnt", expr("length(words_temp2)")).drop("words_temp2") 
val df5 = df4.withColumn("words",split(col("words_temp3"),"\\[")).drop("words_temp3") 
val df6 = df5.withColumn("num_words", size($"words"))  
val df7 = df6.withColumn("v2", explode($"words"))

// Convert to Array of sorts via group by
val df8 = df7.groupBy("k")
            .agg(collect_list("v2"))
// Convert to rdd Tuple and then find position so as to gen col names! That is the clue so as to be able to use pivot
val rdd = df8.rdd
val rdd2 = rdd.map(row => (row.getAs[String](0), row.getAs[WrappedArray[String]](1).toArray))
val rdd3 = rdd2.map { case (k, list) => (k, list.zipWithIndex) }
val df9 = rdd3.toDF("k", "v")
val df10 = df9.withColumn("vn", explode($"v"))
val df11 = df10.select($"k", $"vn".getField("_1"), concat(lit("c"),$"vn".getField("_2"))).toDF("k", "v", "c")

// Final manipulation
val result = df11.groupBy("k")
                 .pivot("c")
                 .agg(expr("coalesce(first(v),null)")) // May never occur in your case, just done for completeness and variable length cols.
 result.show(100,false)

returns in this case:

+---+---+----------+------+------+-----+----+------+
|k  |c0 |c1        |c2    |c3    |c4   |c5  |c6    |
+---+---+----------+------+------+-----+----+------+
|B  |foo|null      |null  |null  |null |null|null  |
|A  |foo|customerid|shopid|Donald|Trump|Esq |single|
+---+---+----------+------+------+-----+----+------+

Upvotes: 1

Ged
Ged

Reputation: 18003

Update: Based on original title stating array of words. See other answer.

If new, then a few things here. Can also be done with dataset and map I assume. Here is a solution using DFs and rdd's. I might well investigate a complete DS in future, but this works for sure and at scale.

// Can amalgamate more steps

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

// Set up data
val df = spark.sparkContext.parallelize(Seq(
    ("A", Array(Array("foo", "bar"), Array("Donald", "Trump","Esq"), Array("single"))),
    ("B", Array(Array("foo2", "bar2"), Array("single2"))),
    ("C", Array(Array("foo3", "bar3", "x", "y", "z")))
     )).toDF("k", "v")
// flatten via 2x explode, can be done more elegeantly with def or UDF, but keeping it simple here
val df2 = df.withColumn("v2", explode($"v"))
val df3 = df2.withColumn("v3", explode($"v2"))
// Convert to Array of sorts via group by
val df4 = df3.groupBy("k")
            .agg(collect_list("v3"))
// Convert to rdd Tuple and then find position so as to gen col names! That is the clue so as to be able to use pivot
val rdd = df4.rdd
val rdd2 = rdd.map(row => (row.getAs[String](0), row.getAs[WrappedArray[String]](1).toArray))
val rdd3 = rdd2.map { case (k, list) => (k, list.zipWithIndex) }
val df5 = rdd3.toDF("k", "v")
val df6 = df5.withColumn("vn", explode($"v"))
val df7 = df6.select($"k", $"vn".getField("_1"), concat(lit("c"),$"vn".getField("_2"))).toDF("k", "v", "c")

// Final manipulation
val result = df7.groupBy("k")
               .pivot("c")
               .agg(expr("coalesce(first(v),null)")) // May never occur in your case, just done for completeness and variable length cols.
result.show(100,false)

returns in correct col order:

+---+----+----+-------+-----+----+------+
|k  |c0  |c1  |c2     |c3   |c4  |c5    |
+---+----+----+-------+-----+----+------+
|B  |foo2|bar2|single2|null |null|null  |
|C  |foo3|bar3|x      |y    |z   |null  |
|A  |foo |bar |Donald |Trump|Esq |single|
+---+----+----+-------+-----+----+------+

Upvotes: 0

Related Questions