user3428294
user3428294

Reputation: 77

Custom partitioning in Pyspark

I am trying to create a custom partitioner in a spark job using PySpark say we have the following data

 x = sc.parallelize([['a1','a2',0], ['b1','b2',0], ['c1','c2',1], ['d1','d2',1], ['e1','e2',1], ['f1','f2',2]])

I would like to split based on the third element of the list ..Below is the code which i am trying but it ends up with an error "Too Many Values to Unpack"

rdd = x.partitionBy(3,lambda x: int(x[2])).collect()

Below is the exact error which i am getting

ValueError: too many values to unpack
    org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)

expected output

[[['a1','a2',0], ['b1','b2',0]], [['c1','c2',1], ['d1','d2',1], ['e1','e2',1]], [['f1','f2',2]]]

Upvotes: 0

Views: 1024

Answers (1)

Pushkr
Pushkr

Reputation: 3619

As @Himaprasoon already pointed out, you can partition on K-V pairs only, in your situation you can try

rdd = x.keyBy(lambda row: int(row[2])).partitionBy(3) 

rdd.values().glom().collect()

which gives :

[[['a1', 'a2', 0], ['b1', 'b2', 0]],
 [['c1', 'c2', 1], ['d1', 'd2', 1], ['e1', 'e2', 1]],
 [['f1', 'f2', 2]]]

Upvotes: 1

Related Questions