Philippe C
Philippe C

Reputation: 677

Empty array returned by mapPartitions in PySpark

hello could someone explain to me why mapPartitions reacts differently to those two functions? (I have looked at this this thread and I don't think my problem comes from the fact that my iterable is TraversableOnce as I create it.

L=range(10)
J=range(5,15)
K=range(8,18)

data=J+K+L

def function_1(iter_listoflist):
    final_iterator=[]
    for sublist in iter_listoflist:
        final_iterator.append([x for x in sublist if x%9!=0])
    return iter(final_iterator)  

def function_2(iter_listoflist):
    final_iterator=[]
    listoflist=list(iter_listoflist)
    for i in range(len(listoflist)):
        for j in range(i+1,len(listoflist)):
            sublist=listoflist[i]+listoflist[j]
            final_iterator.append([x for x in sublist if x%9!=0])
            pass
        pass
    return iter(final_iterator)



sc.parallelize(data,3).glom().mapPartitions(function_1).collect()

returns what it should while

sc.parallelize(data,3).glom().mapPartitions(function_2).collect()

returns an empty array, I have the checked the code by returning a list at the end and it does what I want it to.

thanks for your help

Philippe C

Upvotes: 0

Views: 1476

Answers (1)

zero323
zero323

Reputation: 330113

It is actually quite simple. listoflist has always length equal to 1. To understand why that is the case you have to think about what is going on when call glom. To quote the docs it returns:

an RDD created by coalescing all elements within each partition into a list.

It means that when you call:

listoflist=list(iter_listoflist)

You get a list with a single element list containing all elements from that partition. Ignoring all the details:

(sc.parallelize(data, 3)
    .glom()
    .mapPartitionsWithIndex(lambda i, iter: [(i, list(iter))])
    .collect())

## [(0, [[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]]),
##     (1, [[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]]),
##     (2, [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])]

It means that

  • range(len(listoflist)) contains a single element equal 0.
  • range(i+1,len(listoflist)) by substitution is an empty range(1, 1)

Hence there is nothing to do and you get an empty iterator.

On a side note all these pass statements as well as iter calls are completely obsolete.

Upvotes: 2

Related Questions