Brian Bruggeman
Brian Bruggeman

Reputation: 5324

Pyspark aggregation - each field aggregated in a different way

I have some data that has four basic fields:

  1. Field 1 is a key for the data
  2. Field 2 should be a set of all unique values for this field
  3. Field 3 is a minimum value (timestamp)
  4. Field 4 is a maximum value (timestamp)

Original Code looks like this:

data = (
    dataframe
    .rdd
    # flatten rows
    .flatMap(lambda x: x)
    # Parse JSON
    .flatMap(lambda x: encode_json(x))
    # Capture values
    .map(lambda x: [
        # Merge 'field1', 'field2' --> 'field1, field2'
        ','.join(_ for _ in [x.get('metadata_value'), x.get('field2')]),
        # Create pairing of low and high timestamps
        [x.get('low'), x.get('high')]
    ])
    # Aggregate into a list of low/high timestamps per 'field1, field2'
    .aggregateByKey(list(), lambda u, v: u + [v], lambda u1, u2: u1 + u2)
    # Flatten keys 'ip,guid' --> 'ip', 'guid'
    .map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1], sum(1 for _ in x[1])))
    # Reduce timestamps to single values:  [s1, e1], [s2, e2], ... --> s_min, e_max
    .map(lambda x: (x[0], x[1], min(_[0] for _ in x[2]), max(_[1] for _ in x[2]), x[3]))
)

Original output looks like this:

a | x012345 | 20160103 | 20160107
a | x013579 | 20160101 | 20160106

New output should look like this:

a | {x012345,x013579} | 20160101 | 20160107

Upvotes: 0

Views: 56

Answers (1)

Manu Valdés
Manu Valdés

Reputation: 2372

Add this 2 transforms to your current output, to map to a pair RDD, and reduce each field by it's corresponding operation (dictionary, min,max).

data.map(lambda reg: (reg[0],[reg[1],reg[2],reg[3]])) .reduceByKey(lambda v1,v2: ({v1[0],v2[0]},min(v1[1],v2[1]), max(v1[2],v2[2])))

Upvotes: 1

Related Questions