Vlo
Vlo

Reputation: 3188

Format row into vector, how to reduceByKey (list(n_1, m_1)....(n_k, m_k)) to (n_1...n_k) (m_1...m_k))

Data

RDD read in from textFile() consisting of list of (str-key, [int-id, int-value]) pairs.

[(u'ID1', (132, 1)),
 (u'ID2', (133, 3)),
 (u'ID3', (120, 5)),
 (u'ID4', (110, 0)),
 (u'ID5', (160, 2)),
 (u'ID6', (500, 9)),
 (u'ID7', (932, 8)),
 (u'ID8', (132, 1)),
 (u'ID1', (133, 6)),
 (u'ID8', (133, 1))]

Output I would like to efficiently create a RDD of list of (key, dense/sparsevectors) with as little shuffling as possible

Edit: Based on the comment below. It is not possible to do this in Spark regardless of group/aggregate

Densevector

The file that is being read is in ordered by int-id, so if I were to throw out the int-id and reduceByKey on str-key, I could form a DenseVector of int-value

rdd.map(lambda x: (x[0], [x[1]]))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: [x[0], DenseVector(x[1])])

Would give me the correct ordering of int-value with 1 partition, but is very slow. With more than 1 partitions and workers, this can be very fast, but the order is random across str-key. For example, for str-key ID1 and ID8, the desired output for would be [1, 6], [3, 1] or [6, 1], [1, 3] but it cannot be [1, 6], [1, 3].

1) Is there a way to to reduceByKey but preserve file/read order (or reorder the result based on int-ID)?

Sparsevector

For Sparsevector, I try to feed the list of [int-d, int-value] pairs directly in, but this requires aggregation afaik across ID. The groupByKey() causes massive shuffling.

RDD.map(lambda x: (x[0], (int(x[1]), int(x[2]))))\
            .groupByKey()\
            .mapValues(list)\
            .mapValues(sorted)\
            .mapValues(lambda x: (SparseVector(N, x)))

The list aggregates the data [(int-id, value), (int-id_2, value_2) .... (int-id_n, value_n)] for each str-key. Sorted is there since sparseVector requires a sorted list or dict.

2) Is there a way to write this more efficiently?

Upvotes: 1

Views: 146

Answers (1)

user10534398
user10534398

Reputation:

If data is Sparse (you can compute the exact sparsity threshold, depending on the expected size of the key), groupByKey is the optimal solution - for each row you have to shuffle:

  • The key.
  • The value. Because it is a tuple of primitive values, there is no need for full fledged __dict__ and the size is pretty much as low as possible.

Since (index, value) pairs in you question seems to be unique, there is no reduction in shuffle on the value size, but any complex object will (like a vector) will likely have a larger overhead than a tuple.

The only possible reduction happens on the key side. To achieve one, that outweighs increase on the value size, you need a reasonably dense data.

If that's the case aggregateByKey might perform better although additional cost of merging can still consume possible benefits of map-side combine.

def seq_func(acc, x):
    if x[1]:
        acc[x[0]] = acc.get(x[0], 0) + x[1]
    return acc

def comb_func(acc1, acc2):
    for k in acc2:
        acc1[k] = acc1.get(k, 0) + acc2[k]
    return acc1

rdd.aggregateByKey(dict(), seq_func, comb_func).mapValues(lambda d: SparseVector(N, d))

Otherwise just groupByKey, skip sorting and use dict:

rdd.groupByKey().mapValues(lambda x: SparseVector(N, dict(x)))

Upvotes: 0

Related Questions