user2773013
user2773013

Reputation: 3172

Spark processing columns in parallel

I've been playing with Spark, and I managed to get it to crunch my data. My data consists of flat delimited text file, consisting of 50 columns and about 20 millions of rows. I have scala scripts that will process each column.

In terms of parallel processing, I know that RDD operation run on multiple nodes. So, every time I process a column, they are processed in parallel, but the column itself is processed sequentially.

A simple example: if my data is 5 column text delimited file and each column contain text, and I want to do word count for each column. I would do:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

Although each column's operation is run in parallel, the column itself is processed sequentially(bad wording I know. Sorry!). In other words, column 2 is processed after column 1 is done. Column 3 is processed after column 1 and 2 are done, and so on.

My question is: Is there anyway to process multiple column at a time? If you know a way, cor a tutorial, would you mind sharing it with me?

thank you!!

Upvotes: 6

Views: 3909

Answers (2)

bourneli
bourneli

Reputation: 2230

You can try the following code, which use the scala parallize collection feature,

(0 until 4).map(index => (index,data)).par.map(x => {
    x._2.map(_.split("\t",-1)(x._1)).map((_,1)).reduce(_+_)
}

data is a reference, so duplicate the data will not cost to much. And rdd is read-only, so parallelly processing can work. The par method use the parallely collection feature. You can check the parallel jobs on the spark web UI.

Upvotes: 1

zhang zhan
zhang zhan

Reputation: 1596

Suppose the inputs are seq. Following can be done to process columns concurrently. The basic idea is to using sequence (column, input) as the key.

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

The example output: ((0, x_0), 4) means the first column, key is x_0, and value is 4. You can start from here to process further.

Upvotes: 3

Related Questions