Reputation: 1004
Hi I am using scala to identify a 1st word of the row and create a unique value and append it in the RDD. But i don't know how to do that. I am new to scala so please forgive if this question sounds lame. Sample that i am trying is given below.
Sample:
OBR|1|METABOLIC PANEL
OBX|1|Glucose
OBX|2|BUN
OBX|3|CREATININE
OBR|2|RFLX TO VERIFICATION
OBX|1|EGFR
OBX|2|SODIUM
OBR|3|AMBIGUOUS DEFAULT
OBX|1|POTASSIUM
I want to check if the first word is OBR or not if it is OBR than i create a unique value and want to append it in the OBR and underneath OBX untill i found one more OBR this i want to do. But how can i do this ? I am bringing my data from HDFS.
Expected Result :
OBR|1|METABOLIC PANEL|OBR_filename_1
OBX|1|Glucose|OBR_filename_1
OBX|2|BUN|OBR_filename_1
OBX|3|CREATININE|OBR_filename_1
OBR|2|RFLX TO VERIFICATION|OBR_filename_2
OBX|1|EGFR|OBR_filename_2
OBX|2|SODIUM|OBR_filename_2
OBR|3|AMBIGUOUS DEFAULT|OBR_filename_3
OBX|1|POTASSIUM|OBR_filename_3
Upvotes: 0
Views: 823
Reputation: 746
Ok so as mentioned in my comment this will only work on a single core and shouldn't be done using spark unless someone can shed some light on something I'm missing. I'm assuming the file is just a text file on hdfs as described in your example.
val text: RDD[(String, Long)] = sc.textFile(<path>).zipWithIndex
val tupled: RDD[((String, Int, String), Int)] = text.map{case (r, i) => (r.split('|'), i)).map{case (s, i) => ((s(0), s(1).toInt, s(2)), i)}
val obrToFirstIndex: Array[(Int, Long)] = tupled.filter(_._1._1 == "OBR").map{case (t, i) => (t._2, i)}.reduceByKey(Math.min).collect()
val bcIndexes = sc.broadcast(obrToFirstIndex.sortBy(_._2))
val withObr = tupled.mapValues(i => bcIndexes.value.find(_._2 >= i).getOrElse(bcIndexes.value.last)._1)
val result: RDD[String] = withObr.map{case ((t1, t2, t2), obrind) => Array(t1, t2, t3, s"OBR_filaneme_$obrind").mkString("|")
On my current ennvironement I can't test the above so it may be subject to off by one errors or minor typos but the idea is there. But let me reiterate, this is not a job for spark.
EDIT: Just occured to me since there's only one part you could use a mapPartitions and just write the code how you would in Java/Scala within that partition.
The issue you were encountering is that the find is incorrect, it needs a different condition to work. Here is the simpler method I hinted to earlier with mapPartitions
val text: RDD[String] = sc.textFile(<path>)
val result: RDD[String] = text.mapPartitions{part =>
var obrInd = 0
part.map{r =>
val code= r.split('|')(0)
if(code == "OBR") obrInd += 1
r + "|OBR_filename_" + obrInd
}
}
Upvotes: 1