Reputation: 298
I'm new to Python.
Scala Code:
rdd1 is in string format
rdd1=sc.parallelize("[Canada,47;97;33;94;6]", "[Canada,59;98;24;83;3]","[Canada,77;63;93;86;62]")
val resultRDD = rdd1.map { r =>
val Array(country, values) = r.replaceAll("\\[|\\]", "").split(",")
country -> values
}.reduceByKey((a, b) => a.split(";").zip(b.split(";")).map {
case (i1, i2) => i1.toInt + i2.toInt }.mkString(";"))
Output:
Country,Values //I have puted the column name to make sure that the output should be in two column
Canada,183;258;150;263;71
Upvotes: 2
Views: 204
Reputation: 13998
Edit: OP wants to use map instead of flatMap, so I adjusted flatMap
to map
by which, you just need to take the first item out of the list comprehension, thus map(lambda x: [...][0])
.
side-note: The above change is valid only to this particular case when list comprehension returns a list with only one item. for more general cases, you might need two map()
s to replace what flatMap() does.
One way with RDD is to use a list comprehension to strip, split and convert the String into a key-value pair, with Country as key and a tuple of numbers as value. Since we use list comprehension, so we take flatMap on the RDD element, then use reduceByKey to do the calculation and mapValues to convert the resulting tuple back into string:
rdd1.map(lambda x: [ (e[0], tuple(map(int, e[1].split(';')))) for e in [x.strip('][').split(',')] ][0]) \
.reduceByKey(lambda x,y: tuple([ x[i]+y[i] for i in range(len(x))]) ) \
.mapValues(lambda x: ';'.join(map(str,x))) \
.collect()
output after flatMap:
[('Canada', (47, 97, 33, 94, 6)),
('Canada', (59, 98, 24, 83, 3)),
('Canada', (77, 63, 93, 86, 62))]
output after reduceByKey:
[('Canada', (183, 258, 150, 263, 71))]
output after mapValues:
[('Canada', '183;258;150;263;71')]
Upvotes: 2
Reputation: 2825
You can do something like this
import pyspark.sql.functions as f
from pyspark.sql.functions import col
myRDD = sc.parallelize([('Canada', '47;97;33;94;6'), ('Canada', '59;98;24;83;3'),('Canada', '77;63;93;86;62')])
df = myRDD.toDF()
>>> df.show(10)
+------+--------------+
| _1| _2|
+------+--------------+
|Canada| 47;97;33;94;6|
|Canada| 59;98;24;83;3|
|Canada|77;63;93;86;62|
+------+--------------+
df.select(
col("_1").alias("country"),
f.split("_2", ";").alias("values"),
f.posexplode(f.split("_2", ";")).alias("pos", "val")
)\
.drop("val")\
.select(
"country",
f.concat(f.lit("position"),f.col("pos").cast("string")).alias("name"),
f.expr("values[pos]").alias("val")
)\
.groupBy("country").pivot("name").agg(f.sum("val"))\
.show()
+-------+---------+---------+---------+---------+---------+
|country|position0|position1|position2|position3|position4|
+-------+---------+---------+---------+---------+---------+
| Canada| 183.0| 258.0| 150.0| 263.0| 71.0|
+-------+---------+---------+---------+---------+---------+
Upvotes: 1