AntarianCoder
AntarianCoder

Reputation: 45

Create Tuple out of Array(Array[String) of Varying Sizes using Scala

I am new to scala and I am trying to make a Tuple pair out an RDD of type Array(Array[String]) that looks like:

(122abc,223cde,334vbn,445das),(221bca,321dsa),(231dsa,653asd,698poq,897qwa)

I am trying to create Tuple Pairs out of these arrays so that the first element of each array is key and and any other part of the array is a value. For example the output would look like:

122abc    223cde
122abc    334vbn
122abc    445das
221bca    321dsa
231dsa    653asd
231dsa    698poq
231dsa    897qwa

I can't figure out how to separate the first element from each array and then map it to every other element.

Upvotes: 1

Views: 1381

Answers (4)

Jack Leow
Jack Leow

Reputation: 22477

If I'm reading it correctly, the core of your question has to do with separating the head (first element) of the inner arrays from the tail (remaining elements), which you can use the head and tail methods. RDDs behave a lot like Scala lists, so you can do this all with what looks like pure Scala code.

Given the following input RDD:

val input: RDD[Array[Array[String]]] = sc.parallelize(
  Seq(
    Array(
      Array("122abc","223cde","334vbn","445das"),
      Array("221bca","321dsa"),
      Array("231dsa","653asd","698poq","897qwa")
    )
  )
)

The following should do what you want:

val output: RDD[(String,String)] =
  input.flatMap { arrArrStr: Array[Array[String]] =>
    arrArrStr.flatMap { arrStrs: Array[String] =>
      arrStrs.tail.map { value => arrStrs.head -> value }
    }
  }

And in fact, because of how the flatMap/map is composed, you could re-write it as a for-comprehension.:

val output: RDD[(String,String)] =
  for {
    arrArrStr: Array[Array[String]] <- input
    arrStr: Array[String] <- arrArrStr
    str: String <- arrStr.tail
  } yield (arrStr.head -> str)

Which one you go with is ultimately a matter of personal preference (though in this case, I prefer the latter, as you don't have to indent code as much).

For verification:

output.collect().foreach(println)

Should print out:

(122abc,223cde)
(122abc,334vbn)
(122abc,445das)
(221bca,321dsa)
(231dsa,653asd)
(231dsa,698poq)
(231dsa,897qwa)

Upvotes: 2

stack0114106
stack0114106

Reputation: 8711

Using df and explode.

  val df =   Seq(
      Array("122abc","223cde","334vbn","445das"),
      Array("221bca","321dsa"),
      Array("231dsa","653asd","698poq","897qwa")
    ).toDF("arr")
    val df2 = df.withColumn("key", 'arr(0)).withColumn("values",explode('arr)).filter('key =!= 'values).drop('arr).withColumn("tuple",struct('key,'values))
    df2.show(false)
    df2.rdd.map( x => Row( (x(0),x(1)) )).collect.foreach(println)

Output:

+------+------+---------------+
|key   |values|tuple          |
+------+------+---------------+
|122abc|223cde|[122abc,223cde]|
|122abc|334vbn|[122abc,334vbn]|
|122abc|445das|[122abc,445das]|
|221bca|321dsa|[221bca,321dsa]|
|231dsa|653asd|[231dsa,653asd]|
|231dsa|698poq|[231dsa,698poq]|
|231dsa|897qwa|[231dsa,897qwa]|
+------+------+---------------+


[(122abc,223cde)]
[(122abc,334vbn)]
[(122abc,445das)]
[(221bca,321dsa)]
[(231dsa,653asd)]
[(231dsa,698poq)]
[(231dsa,897qwa)]

Update1:

Using paired rdd

val df =   Seq(
  Array("122abc","223cde","334vbn","445das"),
  Array("221bca","321dsa"),
  Array("231dsa","653asd","698poq","897qwa")
).toDF("arr")
import scala.collection.mutable._
val rdd1 = df.rdd.map( x => { val y = x.getAs[mutable.WrappedArray[String]]("arr")(0); (y,x)} )
val pair = new PairRDDFunctions(rdd1)
pair.flatMapValues( x => x.getAs[mutable.WrappedArray[String]]("arr") )
    .filter( x=> x._1 != x._2)
    .collect.foreach(println)

Results:

(122abc,223cde)
(122abc,334vbn)
(122abc,445das)
(221bca,321dsa)
(231dsa,653asd)
(231dsa,698poq)
(231dsa,897qwa)

Upvotes: 1

Amit Prasad
Amit Prasad

Reputation: 725

Convert your input element to seq and all and then try to write the wrapper which will give you List(List(item1,item2), List(item1,item2),...)

Try below code

val seqs = Seq("122abc","223cde","334vbn","445das")++
Seq("221bca","321dsa")++
Seq("231dsa","653asd","698poq","897qwa")

Write a wrapper to convert seq into a pair of two

def toPairs[A](xs: Seq[A]): Seq[(A,A)] = xs.zip(xs.tail)

Now send your seq as params and it it will give your pair of two

toPairs(seqs).mkString(" ")

After making it to string you will get the output like

res8: String = (122abc,223cde) (223cde,334vbn) (334vbn,445das) (445das,221bca) (221bca,321dsa) (321dsa,231dsa) (231dsa,653asd) (653asd,698poq) (698poq,897qwa)

Now you can convert your string, however, you want.

Upvotes: 1

erip
erip

Reputation: 16935

This is a classic fold operation; but folding in Spark is calling aggregate:

// Start with an empty array
data.aggregate(Array.empty[(String, String)]) { 
  // `arr.drop(1).map(e => (arr.head, e))` will create tuples of 
  // all elements in each row and the first element.
  // Append this to the aggregate array.
  case (acc, arr) => acc ++ arr.drop(1).map(e => (arr.head, e))
}

The solution is a non-Spark environment:

scala> val data = Array(Array("122abc","223cde","334vbn","445das"),Array("221bca","321dsa"),Array("231dsa","653asd","698poq","897qwa"))
scala> data.foldLeft(Array.empty[(String, String)]) { case (acc, arr) =>
     |     acc ++ arr.drop(1).map(e => (arr.head, e))
     | }
res0: Array[(String, String)] = Array((122abc,223cde), (122abc,334vbn), (122abc,445das), (221bca,321dsa), (231dsa,653asd), (231dsa,698poq), (231dsa,897qwa))

Upvotes: 1

Related Questions