Gustav
Gustav

Reputation: 109

Pyspark: merging values in a nested list

I have a pair-RDD with the structure: [(key, [(timestring, value)]]

Example:

[("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66),...]),
 ("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11),...])
  ...]

I want to process the list for each key, grouping by timestring and calculate the mean of all values for identical timestrings. So the above example would become:

[("key1", [("20161101", 32), ..]),
 ("key2", [("20161101", 47.5),...])
  ...]

I struggle to find a solution just using Pyspark methods in one step, is it at all possible or do I need to use some intermediate steps?

Upvotes: 2

Views: 192

Answers (1)

user6022341
user6022341

Reputation:

You can define a function:

from itertools import groupby
import numpy as np

def mapper(xs):
    return [(k, np.mean([v[1] for v in vs])) for k, vs in groupby(sorted(xs), lambda x: x[0])]

And mapValues

rdd = sc.parallelize([
    ("key1", [("20161101", 23), ("20161101", 41), ("20161102", 66)]),
    ("key2", [("20161101", 86), ("20161101", 9), ("20161102", 11)])
])

rdd.mapValues(mapper)

Upvotes: 1

Related Questions