ishwar
ishwar

Reputation: 298

I want to do the same transformation in Python as I did in Scala

I'm new to Python.

Scala Code:

rdd1 is in string format

rdd1=sc.parallelize("[Canada,47;97;33;94;6]", "[Canada,59;98;24;83;3]","[Canada,77;63;93;86;62]")



val resultRDD = rdd1.map { r =>
  val Array(country, values) = r.replaceAll("\\[|\\]", "").split(",")
    country -> values
}.reduceByKey((a, b) => a.split(";").zip(b.split(";")).map {
  case (i1, i2) => i1.toInt + i2.toInt }.mkString(";"))

Output:

Country,Values  //I have puted the column name to make sure that the output should be in two column  
Canada,183;258;150;263;71

Upvotes: 2

Views: 204

Answers (2)

jxc
jxc

Reputation: 13998

Edit: OP wants to use map instead of flatMap, so I adjusted flatMap to map by which, you just need to take the first item out of the list comprehension, thus map(lambda x: [...][0]).

side-note: The above change is valid only to this particular case when list comprehension returns a list with only one item. for more general cases, you might need two map()s to replace what flatMap() does.

One way with RDD is to use a list comprehension to strip, split and convert the String into a key-value pair, with Country as key and a tuple of numbers as value. Since we use list comprehension, so we take flatMap on the RDD element, then use reduceByKey to do the calculation and mapValues to convert the resulting tuple back into string:

rdd1.map(lambda x: [ (e[0], tuple(map(int, e[1].split(';')))) for e in [x.strip('][').split(',')] ][0]) \
    .reduceByKey(lambda x,y: tuple([ x[i]+y[i] for i in range(len(x))]) ) \
    .mapValues(lambda x: ';'.join(map(str,x))) \
    .collect()

output after flatMap:

[('Canada', (47, 97, 33, 94, 6)),
 ('Canada', (59, 98, 24, 83, 3)),
 ('Canada', (77, 63, 93, 86, 62))]

output after reduceByKey:

[('Canada', (183, 258, 150, 263, 71))]

output after mapValues:

[('Canada', '183;258;150;263;71')]

Upvotes: 2

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

You can do something like this

import pyspark.sql.functions as f
from pyspark.sql.functions import col

myRDD = sc.parallelize([('Canada', '47;97;33;94;6'), ('Canada', '59;98;24;83;3'),('Canada', '77;63;93;86;62')])

df = myRDD.toDF()

>>> df.show(10)
+------+--------------+
|    _1|            _2|
+------+--------------+
|Canada| 47;97;33;94;6|
|Canada| 59;98;24;83;3|
|Canada|77;63;93;86;62|
+------+--------------+


df.select(
        col("_1").alias("country"),
        f.split("_2", ";").alias("values"),
        f.posexplode(f.split("_2", ";")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "country",
        f.concat(f.lit("position"),f.col("pos").cast("string")).alias("name"),
        f.expr("values[pos]").alias("val")
    )\
    .groupBy("country").pivot("name").agg(f.sum("val"))\
    .show()        

+-------+---------+---------+---------+---------+---------+                     
|country|position0|position1|position2|position3|position4|
+-------+---------+---------+---------+---------+---------+
| Canada|    183.0|    258.0|    150.0|    263.0|     71.0|
+-------+---------+---------+---------+---------+---------+


Upvotes: 1

Related Questions