Reputation: 95
I have a text file containing tens of GBs of data, which I need to load from HDFS and parallelize as an RDD. This text file describes items with the following format. Note that the alphabetic strings are not present (the meaning of each row is implicit) and that each row can contain whitespaces to separate different values:
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
0220 (weight)
3030 (amount)
I reckon that the most inmediate approach to parallelize this file would be to upload it to the HDFS from the local filesystem and then to create an RDD by executing sc.textFile(filepath)
. However, in this case, the partitioning would depend on the HDFS splits corresponding to the file.
The problem with said approach is that each partition can contain incomplete items. For example:
Partition 1
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
Partition 2
0220 (weight)
3030 (amount)
Thus, when we call a method for each partition and pass its corresponding data block to it, it will be receiving an incomplete specification for the item identified as 0002. This will result in a wrong output for the calculations performed inside the called method.
Which would be the most efficient way to partition or repartition this RDD to avoid this problem? Can specify the number of lines of each partition to be multiple of 4? If so, should it be done by Hadoop or Spark?
Upvotes: 3
Views: 1277
Reputation: 4499
Load the text file to obtain RDD[String]
then use zipWithIndex for converting to RDD[(String, Long)]
where the second attribute in tuple is the index number of the element in the RDD.
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
idx_div
the next four lines will get 1 as idx_div
and so on. eg. [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, ...
. This can be used to group all (four) lines belonging to one record for further parsing and processing case class Record(id:String, dimensions:String, weight:String, amount:String)
val lines = sc.textFile("...")
val records = lines
.zipWithIndex
.groupBy(line_with_idx => (line_with_idx._2 / 4)) // groupBy idx_div
.map(grouped_record => {
val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2) // Additional check to ensure ordering
val lines_list = lines_with_idx_list.map(_._1)
val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
new Record(id, dimensions, weight, amount)
})
Upvotes: 3
Reputation: 3354
Why don't you simply group lines before put the file into HDFS to avoid this problem?
xargs -L4 echo < file
hdfs dfs -put file /your/path
Your data will look like
0001 1000 0100 0030
0002 1110 0220 3030
If you do that, you can read your data using Spark DataFrames API which is more optimal than RDD¡s and provides you a more rich API and performance to write your application.
Upvotes: 1