Surender Raja
Surender Raja

Reputation: 3609

How to generate multiple records based on column?

I have records like below. I would like to convert a single record into two records with values EXTERNAL and INTERNAL each if the 3rd attribute is All.

Input dataset:

Surender,cts,INTERNAL
Raja,cts,EXTERNAL
Ajay,tcs,All

Expected output:

Surender,cts,INTERNAL
Raja,cts,EXTERNAL
Ajay,tcs,INTERNAL
Ajay,tcs,EXTERNAL

My Spark Code :

case class Customer(name:String,organisation:String,campaign_type:String)

val custRDD = sc.textFile("/user/cloudera/input_files/customer.txt")

val mapRDD = custRDD.map(record => record.split(","))
    .map(arr => (arr(0),arr(1),arr(2))
    .map(tuple => {
      val name            = tuple._1.trim
      val organisation    = tuple._2.trim
      val campaign_type   = tuple._3.trim.toUpperCase
      Customer(name, organisation, campaign_type)
    })

mapRDD.toDF().registerTempTable("customer_processed")

sqlContext.sql("SELECT * FROM customer_processed").show

Could Someone help me to fix this issue?

Upvotes: 1

Views: 565

Answers (2)

eliasah
eliasah

Reputation: 40370

You can use and udf to transform the campaign_type column containing a Seq of strings to map it to the campaigns type and then explode :

val campaignType_ : (String => Seq[String]) = {
  case s if s == "ALL" => Seq("EXTERNAL", "INTERNAL")
  case s => Seq(s)
}

val campaignType = udf(campaignType_)

val df = Seq(("Surender", "cts", "INTERNAL"),
  ("Raja", "cts", "EXTERNAL"),
  ("Ajay", "tcs", "ALL"))
  .toDF("name", "organisation", "campaign_type")

val step1 = df.withColumn("campaign_type", campaignType($"campaign_type"))
step1.show
// +--------+------------+--------------------+
// |    name|organisation|       campaign_type|
// +--------+------------+--------------------+
// |Surender|         cts|          [INTERNAL]|
// |    Raja|         cts|          [EXTERNAL]|
// |    Ajay|         tcs|[EXTERNAL, INTERNAL]|
// +--------+------------+--------------------+

val step2 = step1.select($"name", $"organisation", explode($"campaign_type"))
step2.show
// +--------+------------+--------+
// |    name|organisation|     col|
// +--------+------------+--------+
// |Surender|         cts|INTERNAL|
// |    Raja|         cts|EXTERNAL|
// |    Ajay|         tcs|EXTERNAL|
// |    Ajay|         tcs|INTERNAL|
// +--------+------------+--------+

EDIT:

You don't actually need a udf, you can use a when().otherwise predicate instead on step1 as followed :

val step1 = df.withColumn("campaign_type", 
when(col("campaign_type") === "ALL", array("EXTERNAL", "INTERNAL")).otherwise(array(col("campaign_type")))

Upvotes: 3

Jacek Laskowski
Jacek Laskowski

Reputation: 74709

Since it's Scala...

If you want to write a more idiomatic Scala code (and perhaps trading some performance due to lack of optimizations to have a more idiomatic code), you can use flatMap operator (removed the implicit parameter):

flatMap[U](func: (T) ⇒ TraversableOnce[U]): Dataset[U] Returns a new Dataset by first applying a function to all elements of this Dataset, and then flattening the results.

NOTE: flatMap is equivalent to explode function, but you don't have to register a UDF (as in the other answer).

A solution could be as follows:

// I don't care about the names of the columns since we use Scala
// as you did when you tried to write the code
scala> input.show
+--------+---+--------+
|     _c0|_c1|     _c2|
+--------+---+--------+
|Surender|cts|INTERNAL|
|    Raja|cts|EXTERNAL|
|    Ajay|tcs|     All|
+--------+---+--------+

val result = input.
  as[(String, String, String)].
  flatMap { case r @ (name, org, campaign) => 
    if ("all".equalsIgnoreCase(campaign)) {
      Seq("INTERNAL", "EXTERNAL").map { cname =>
        (name, org, cname)
      }
    } else Seq(r)
  }
scala> result.show
+--------+---+--------+
|      _1| _2|      _3|
+--------+---+--------+
|Surender|cts|INTERNAL|
|    Raja|cts|EXTERNAL|
|    Ajay|tcs|INTERNAL|
|    Ajay|tcs|EXTERNAL|
+--------+---+--------+

Comparing performance of the two queries, i.e. flatMap-based vs explode-based queries, I think explode-based may be slightly faster and optimized better as some code is under Spark's control (using logical operators before they get mapped to physical couterparts). In flatMap the entire optimization is your responsibility as a Scala developer.

The below red-bounded area corresponds to flatMap-based code and the warning sign are very cost expensive DeserializeToObject and SerializeFromObject operators.

enter image description here

What's interesting is the number of Spark jobs per query and their durations. It appears that explode-based query takes 2 Spark jobs and 200 ms while flatMap-based take only 1 Spark job and 43 ms.

enter image description here

That surprises me a lot and suggests that flatMap-based query could be faster (!)

Upvotes: 3

Related Questions