Falcon Taylor-Carter
Falcon Taylor-Carter

Reputation: 35

PySpark repartitioning RDD elements

I have a spark job that reads from a Kafka stream and performs an action for each RDD in the stream. If the RDD is not empty, I want to save the RDD to HDFS, but I want to create a file for each element in the RDD. I've found

RDD.saveAsTextFile(file_location)

Will create a file for each partition, so I am trying to change the RDD such that each partition contains only one element. Here's an example of what I'm trying to do

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'])
data.glom().collect() #Produces [['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0']]
data.saveAsTextFile(file_location) #Produces 2 files

I can get closer to what I want, but I can't find a way to ensure each partition has only one element

data1 = data.coalesce(1, True).repartition(data.count())
data1.glom().collect() #Produces [[], ['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0'], [], [], [], [], [], [], []] 
data2 = data.map(lambda t : t).coalesce(1, True).repartition(data.count())
data2.glom().collect() #Produces [[], ['1'], ['2', '3'], ['4', '5'], ['6'], ['7', '8'], ['9', '0'], [], [], []] 
data2.saveAsTextFile(file_location) #Produces 10 files, but some are empty

I know in this example I can pass my desired partitions to sc.parallelize() but that won't be possible when I am reading from a kafka stream. Any recommendations on how to repartition the way I want, or on how to better approach this?

Upvotes: 3

Views: 8422

Answers (2)

ayan guha
ayan guha

Reputation: 1257

Well, here is a python solution for custom partitioning.

(Just to be clear, getting each element in separate file is probably not the best design).

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0']).map(lambda x: (x,x))
print data.collect()
c = data.count()
wp = data.partitionBy(c,lambda k: int(k))
print wp.map(lambda t: t[0]).glom().collect()
sc.stop()

Result:

[('1', '1'), ('2', '2'), ('3', '3'), ('4', '4'), ('5', '5'), ('6', '6'), ('7', '7'), ('8', '8'), ('9', '9'), ('0', '0')]
[['0'], ['1'], ['2'], ['3'], ['4'], ['5'], ['6'], ['7'], ['8'], ['9']]

Hope this helps.

Upvotes: 3

Holden
Holden

Reputation: 7442

The python partitioner API is, underneath, using a hash partitioner which is why even when you have K buckets you still get some "collisions". If you can do this in Scala you can provide a custom partitioner (range based + number of buckets == num elems would probably do the trick). However there is some overhead per-partition (and repartitioning is an expensive operation), it might be more reasonable to do your saving logic instead of a foreach instead of repartition.

Upvotes: 2

Related Questions