Guforu
Guforu

Reputation: 4023

RDD transform into Spark

I have the next RDD, which looks like :

((0,1), 2)
((0,2), 3)
((1,1), 3)

I'm looking currently for some method, which convert the given RDD to the form:

([0, 2, 3],
 [0, 3, 0])

In the other words, the method creates a RDD of lists according the key values in the initial RDD. If some value is not available, then the methods put just 0 on this place.

I coded someself the next two methods, which I use for the executable solution.

def matrixForm(rdd):
        rdd2 = rdd.map(lambda ((x,y),k): (x,y,k))
        rdd3 = rdd2.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
        rdd4 = rdd3.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
        rdd5 = rdd4.map(lambda x: map(lambda (i, y): y , x))
        rdd6 = rdd5.map(lambda x: list(x))
        rdd7 = rddTranspose(rdd6)
        return rdd7

def rddTranspose(rdd):
        rddT1 = rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
        rddT2 = rddT1.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
        rddT3 = rddT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
        rddT4 = rddT3.map(lambda x: map(lambda (i, y): y , x))
        return rddT4.map(lambda x: list(x))

This way is working, but seems to be not efficient. If somebody has a time and wish to discuss and improve my solution, please participate in the discussion. Thank you in advance.

PS The second example for input and output

((0,0), 1)
((1,1), 1)
((2,2), 1)
((3,3), 1)

([1,0,0,0]
 [0,1,0,0]
 [0,0,1,0]
 [0,0,0,1])

The highest index of id1 is a number of lists, the highest index of id2 is a length of every unique list

Upvotes: 0

Views: 378

Answers (1)

jtitusj
jtitusj

Reputation: 3086

Try this:

def toRow(n, lst):
    row = [0] * n
    for (index, val) in lst:
        row[index] = val
    return row

def toDense(rdd):
    n = rdd.map(lambda ((i, j), k): j).max() + 1
    rdd1 = rdd.map(lambda ((i,j), k): (i, (j,k)))
    rdd2 = rdd1.groupByKey().sortByKey().map(lambda x: list(x[1]))
    return rdd2.map(lambda lst: toRow(n, lst))

then take a look:

toDense(rdd).take(2)

Upvotes: 1

Related Questions