Reputation: 61
I'm trying to store output of reduce() into an RDD, so i can apply more transformation on it.
Here what I have tried:
I have an text file like this:
hello
i'm Arya
i'm 21 yrold
Hello
i'm Jack
i'm 30.
i am ali.
i'm Harry
I'am 40 years old
and i am an engineer.
I want to merge each line but not the paragraph.
rdd = sc.textFile('asd.txt')
rdd1=sc.parallelize([rdd.reduce(lambda x,y: "\n".join([x,y]))])
this works but there should be more efficient way. I dont want to create another rdd each time.
Upvotes: 0
Views: 334
Reputation: 35249
I believe this question deserves more elaborate answer. Let's start with this piece of code:
rdd.reduce(lambda x,y: "\n".join([x,y]))
Contrary to what you may think, it doesn't guarantee that the values are merged in a specific order. If you for example port it to Scala, you are likely to get completely mixed up result.
Next, there is no use in having RDD
with a single item. If you do:
So if you have a single item and want to:
apply more transformation on it.
Just use plain Python objects.
Is wholeTextFiles
any better? It is not. With a single file it suffers from the same problem as keeping local object.
Finally wholeTextFiles
implementation is fairly inefficient, so overall memory footprint in PySpark can few times larger than the size of the data.
You didn't provide enough context, but I'll make an educated guess and assume you want to separate blocks of data. If I am right you should use custom delimiter
(creating spark data structure from multiline record):
rdd = sc.newAPIHadoopFile(
'/tmp/asd.txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\n\n'}
).values()
which will split you data like this:
rdd.take(3)
# ["hello\ni'm Arya\ni'm 21 yrold", "Hello\ni'm Jack\ni'm 30.", 'i am ali.']
Upvotes: 1
Reputation: 41957
You can use wholeTextFiles
function for reading the file which would give you Tuple2(filename, text)
. The text
is the whole text of the file which you are trying to create by using join.
rdd = sc.wholeTextFiles("asd.txt").map(lambda x : x[1])
Upvotes: 0