Reputation: 3188
Data
RDD read in from textFile() consisting of list of (str-key, [int-id, int-value]) pairs.
[(u'ID1', (132, 1)),
(u'ID2', (133, 3)),
(u'ID3', (120, 5)),
(u'ID4', (110, 0)),
(u'ID5', (160, 2)),
(u'ID6', (500, 9)),
(u'ID7', (932, 8)),
(u'ID8', (132, 1)),
(u'ID1', (133, 6)),
(u'ID8', (133, 1))]
Output I would like to efficiently create a RDD of list of (key, dense/sparsevectors) with as little shuffling as possible
Edit: Based on the comment below. It is not possible to do this in Spark regardless of group/aggregate
Densevector
The file that is being read is in ordered by int-id, so if I were to throw out the int-id and reduceByKey on str-key, I could form a DenseVector of int-value
rdd.map(lambda x: (x[0], [x[1]]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda x: [x[0], DenseVector(x[1])])
Would give me the correct ordering of int-value with 1 partition, but is very slow. With more than 1 partitions and workers, this can be very fast, but the order is random across str-key. For example, for str-key ID1 and ID8, the desired output for would be [1, 6], [3, 1] or [6, 1], [1, 3] but it cannot be [1, 6], [1, 3].
1) Is there a way to to reduceByKey but preserve file/read order (or reorder the result based on int-ID)?
Sparsevector
For Sparsevector, I try to feed the list of [int-d, int-value] pairs directly in, but this requires aggregation afaik across ID. The groupByKey() causes massive shuffling.
RDD.map(lambda x: (x[0], (int(x[1]), int(x[2]))))\
.groupByKey()\
.mapValues(list)\
.mapValues(sorted)\
.mapValues(lambda x: (SparseVector(N, x)))
The list aggregates the data [(int-id, value), (int-id_2, value_2) .... (int-id_n, value_n)] for each str-key. Sorted is there since sparseVector requires a sorted list or dict.
2) Is there a way to write this more efficiently?
Upvotes: 1
Views: 146
Reputation:
If data is Sparse (you can compute the exact sparsity threshold, depending on the expected size of the key), groupByKey
is the optimal solution - for each row you have to shuffle:
tuple
of primitive values, there is no need for full fledged __dict__
and the size is pretty much as low as possible.Since (index, value) pairs in you question seems to be unique, there is no reduction in shuffle on the value size, but any complex object will (like a vector) will likely have a larger overhead than a tuple
.
The only possible reduction happens on the key side. To achieve one, that outweighs increase on the value size, you need a reasonably dense data.
If that's the case aggregateByKey
might perform better although additional cost of merging can still consume possible benefits of map-side combine.
def seq_func(acc, x):
if x[1]:
acc[x[0]] = acc.get(x[0], 0) + x[1]
return acc
def comb_func(acc1, acc2):
for k in acc2:
acc1[k] = acc1.get(k, 0) + acc2[k]
return acc1
rdd.aggregateByKey(dict(), seq_func, comb_func).mapValues(lambda d: SparseVector(N, d))
Otherwise just groupByKey
, skip sorting and use dict
:
rdd.groupByKey().mapValues(lambda x: SparseVector(N, dict(x)))
Upvotes: 0