XYZ
XYZ

Reputation: 382

Python (Pyspark) nested list reduceByKey, Python list append to create nested list

I have an RDD input with the format as the following:

[('2002', ['cougar', 1]),
('2002', ['the', 10]),
('2002', ['network', 4]),
('2002', ['is', 1]),
('2002', ['database', 13])]

The '2002' is the key. So, I have key value pairs as:

 ('year', ['word', count])

Count is integer number, I would like to use reduceByKey to get the following result:

[('2002, [['cougar', 1], ['the', 10], ['network', 4], ['is', 1], ['database', 13]]')]

I struggle a lot to get a nest list as above. The main issue is the get the nested list. E.g. I have three list a, b and c

a = ['cougar', 1]
b = ['the', 10]
c = ['network', 4]

a.append(b)

will return a as

 ['cougar', 1, ['the', 10]]

and

x = []
x.append(a)
x.append(b)

will return x as

  [['cougar', 1], ['the', 10]]

However, if then

  c.append(x)

will return c as

  ['network', 4, [['cougar', 1], ['the', 10]]]

All the above operation does not get me the desired result.

I want to get

   [('2002', [[word1, c1],[word2, c2], [word3, c3], ...]), 
   ('2003'[[w1, count1],[w2, count2], [w3, count3], ...])]

i.e the nested list should be:

  [a, b, c]

Where a, b, c themselves are list with two elements.

I hope the question is clear and any advice?

Upvotes: 1

Views: 235

Answers (2)

titiro89
titiro89

Reputation: 2108

There is no need to use ReduceByKey for this problem.

  • Define RDD

rdd = sc.parallelize([('2002', ['cougar', 1]),('2002', ['the', 10]),('2002', ['network', 4]),('2002', ['is', 1]),('2002', ['database', 13])])

  • See the RDD values with rdd.collect():

[('2002', ['cougar', 1]), ('2002', ['the', 10]), ('2002', ['network', 4]), ('2002', ['is', 1]), ('2002', ['database', 13])]

  • Apply the groupByKey function and map the values as list as you can see in the Apache Spark docs.

rdd_nested = rdd.groupByKey().mapValues(list)

  • See the RDD grouped values rdd_nested.collect():

[('2002', [['cougar', 1], ['the', 10], ['network', 4], ['is', 1], ['database', 13]])]

Upvotes: 1

XYZ
XYZ

Reputation: 382

I have come out one solution:

def wagg(a,b):  
    if type(a[0]) == list: 
        if type(b[0]) == list:
            a.extend(b)
        else: 
            a.append(b)
        w = a
    elif type(b[0]) == list: 
        if type(a[0]) == list:
            b.extend(a)
        else:    
            b.append(a)
        w = b
    else: 
        w = []
        w.append(a)
        w.append(b)
    return w  


rdd2 = rdd1.reduceByKey(lambda a,b: wagg(a,b)) 

Does anyone have better solution?

Upvotes: 1

Related Questions