user4816771
user4816771

Reputation:

Divide rdd with spark

I have an rdd with looks like

[u'1,0,0,0,0,0,0,0,1,2013,52,0,4,1,0',
 u'1,0,0,0,1,1,0,1,1,2012,49,1,1,0,1',
 u'1,0,0,0,1,1,0,0,1,2012,49,1,1,0,1',
 u'0,1,0,0,0,0,1,1,1,2014,45,0,0,1,0']

Is there a way to get three separate rdds like , make a filter based on year column value ?

[u'1,0,0,0,0,0,0,0,1,2013,52,0,4,1,0']

and

[ u'1,0,0,0,1,1,0,1,1,2012,49,1,1,0,1',
     u'1,0,0,0,1,1,0,0,1,2012,49,1,1,0,1']

and

  [u'0,1,0,0,0,0,1,1,1,2014,45,0,0,1,0']

Upvotes: 3

Views: 622

Answers (2)

Ajay
Ajay

Reputation: 5347

There's a better solution than this.I learned many things working on this and wasted so much of time couldn't resist to post it.

In [60]: a
Out[60]: 
[u'1,0,0,0,0,0,0,0,1,2013,52,0,4,1,0',
 u'1,0,0,0,1,1,0,1,1,2012,49,1,1,0,1',
 u'1,0,0,0,1,1,0,0,1,2012,49,1,1,0,1',
 u'0,1,0,0,0,0,1,1,1,2014,45,0,0,1,0']

It's very confusing for me to work with strings so i changed them into ints.

In [61]: b=[map(int,elem.split(',')) for elem in a]

In [62]: b
Out[62]: 
[[1, 0, 0, 0, 0, 0, 0, 0, 1, 2013, 52, 0, 4, 1, 0],
 [1, 0, 0, 0, 1, 1, 0, 1, 1, 2012, 49, 1, 1, 0, 1],
 [1, 0, 0, 0, 1, 1, 0, 0, 1, 2012, 49, 1, 1, 0, 1],
 [0, 1, 0, 0, 0, 0, 1, 1, 1, 2014, 45, 0, 0, 1, 0]]

Sorted b based on year.

In [63]: b_s=sorted(b,key=itemgetter(-6))

In [64]: b_s
Out[64]: 
[[1, 0, 0, 0, 1, 1, 0, 1, 1, 2012, 49, 1, 1, 0, 1],
 [1, 0, 0, 0, 1, 1, 0, 0, 1, 2012, 49, 1, 1, 0, 1],
 [1, 0, 0, 0, 0, 0, 0, 0, 1, 2013, 52, 0, 4, 1, 0],
 [0, 1, 0, 0, 0, 0, 1, 1, 1, 2014, 45, 0, 0, 1, 0]]

Using groupby from operator module to group based on year.

In [65]: [list(g) for k,g in groupby(b_s,key=itemgetter(-6))]
Out[65]: 
[[[1, 0, 0, 0, 1, 1, 0, 1, 1, 2012, 49, 1, 1, 0, 1],
  [1, 0, 0, 0, 1, 1, 0, 0, 1, 2012, 49, 1, 1, 0, 1]],
 [[1, 0, 0, 0, 0, 0, 0, 0, 1, 2013, 52, 0, 4, 1, 0]],
 [[0, 1, 0, 0, 0, 0, 1, 1, 1, 2014, 45, 0, 0, 1, 0]]]

Upvotes: 1

jaynp
jaynp

Reputation: 3325

Here's one way using groupBy, and assuming your original RDD has the variable name rdd:

rdd = rdd.groupBy(lambda x: x.split(",")[9])
new_rdds = [sc.parallelize(x[1]) for x in rdd.collect()]

for x in new_rdds:
    print x.collect()

Upvotes: 3

Related Questions