osman tamer
osman tamer

Reputation: 61

Pyspark - How to store action output into RDD

I'm trying to store output of reduce() into an RDD, so i can apply more transformation on it.

Here what I have tried:

I have an text file like this:

hello
i'm Arya
i'm 21 yrold

Hello
i'm Jack
i'm 30.

i am ali.

i'm Harry
I'am 40 years old
and i am an engineer.

I want to merge each line but not the paragraph.

rdd = sc.textFile('asd.txt')
rdd1=sc.parallelize([rdd.reduce(lambda x,y: "\n".join([x,y]))])

this works but there should be more efficient way. I dont want to create another rdd each time.

Upvotes: 0

Views: 334

Answers (2)

Alper t. Turker
Alper t. Turker

Reputation: 35249

I believe this question deserves more elaborate answer. Let's start with this piece of code:

rdd.reduce(lambda x,y: "\n".join([x,y]))

Contrary to what you may think, it doesn't guarantee that the values are merged in a specific order. If you for example port it to Scala, you are likely to get completely mixed up result.

Next, there is no use in having RDD with a single item. If you do:

  • Data is not distributed - you are as good as having a local object.
  • Consequently processing is not truly parallelized.

So if you have a single item and want to:

apply more transformation on it.

Just use plain Python objects.

Is wholeTextFiles any better? It is not. With a single file it suffers from the same problem as keeping local object.

  • With single file all data goes to a single partition.
  • Processing is not distributed.
  • Data is eagerly loaded and not distributed, so when the size of the input grows, you may expect executor failures.

Finally wholeTextFiles implementation is fairly inefficient, so overall memory footprint in PySpark can few times larger than the size of the data.

You didn't provide enough context, but I'll make an educated guess and assume you want to separate blocks of data. If I am right you should use custom delimiter (creating spark data structure from multiline record):

rdd = sc.newAPIHadoopFile(
    '/tmp/asd.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\n\n'}
).values()

which will split you data like this:

rdd.take(3)
# ["hello\ni'm Arya\ni'm 21 yrold", "Hello\ni'm Jack\ni'm 30.", 'i am ali.']

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can use wholeTextFiles function for reading the file which would give you Tuple2(filename, text). The text is the whole text of the file which you are trying to create by using join.

rdd = sc.wholeTextFiles("asd.txt").map(lambda x : x[1])

Upvotes: 0

Related Questions