Reputation: 2316
I have two RDDs
in PySpark:
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
Both RDDs
have same number or rows. Now what I want to do is take all the columns in each row from RDD1(converted from unicode
to normal string
) and the 2nd column from each row in RDD2 (converted from unicode string
to float
) and form a new RDD with that. So the new RDD will look like this:
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
Once that is done then I want to do aggregation
of last value in each row(the float value) in this new RDD3
by the date
value in 1st column. That mans all the rows where date
is 2013-01-31 00:00:00
, their last numeric values should be added.
How can I do this in PySpark?
Upvotes: 5
Views: 2580
Reputation: 18022
You need to zipWithIndex your RDDs
, this method creates a tuple with your data and with another value that represents the index of that entry, therefore you can join both RDDs
by index
.
Your approach should be similar to (I bet there are more efficient ways):
rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))
zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))
print zdd1.join(zdd2).collect()
The output will be:
[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))]
, after this only a map
is required to recompose the data. E.g. below:
combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()
# You can use the .zip method combinedRDD = rdd1.zip(rdd2)
The output will be:
[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
About the data type conversion, I have had that problem before and to solve this I use this snippet.
import unicodedata
convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
.encode('ascii','ignore'), v2)
combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()
Will output: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]
Upvotes: 2
Reputation: 1816
For the first part of your question, that is combining the two rdds into one where each row is a tuple of 7, you can do this:
rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,b,c,d,e,f,g))
I'm not sure what you eventually need, is it just the date and the sum of the second value? If so, you don't need all the values:
rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,g))
rdd4 = rdd3.reduceByKey(lambda x, y: x+y)
Upvotes: 0