Brian Bruggeman
Brian Bruggeman

Reputation: 5314

How to merge multiple JSON data rows based on a field in pyspark with a given reduce function

How do I merge the JSON data rows as shown below using the merge function below with pyspark?

Note: Assume this is just a minutia example and I have 1000s of rows of data to merge. What is the most performant solution? For better or for worse, I must use pyspark.

Input:

data = [
    {'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
    {'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
    {'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
    ...  More ...
]

Desired Output:

combined_result = [
    {'name': 'Joe Schmoe': {'addresses': [('20080411204445', '100 Sunder Ct'), ('20040218165319', '100 Lee Ave')]}},
    {'name': 'John Doe': {'addresses': [('20120309173318', '1818 Westminster')]}},
    ... More ...
]

Merge function:

def reduce_on_name(a, b):
    '''Combines two JSON data rows based on name'''
    merged = {}
    if a['name'] == b['name']:
        addresses = (a['timestamp'], a['address']), (b['timestamp'], b['address'])
        merged['name'] = a['name']
        merged['addresses'] = addresses
    return merged

Upvotes: 1

Views: 1460

Answers (2)

Brian Bruggeman
Brian Bruggeman

Reputation: 5314

All right, using maxymoo's example, I put together my own reusable code. It's not exactly what I was looking for, but it gets me closer to how I want to solve this particular problem: without lambdas and with reusable code.

#!/usr/bin/env pyspark
# -*- coding: utf-8 -*-
data = [
    {'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
    {'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
    {'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
]


def combine(field):
    '''Returns a function which reduces on a specific field

    Args:
        field(str): data field to use for merging

    Returns:
        func: returns a function which supplies the data for the field
    '''

    def _reduce_this(data):
        '''Returns the field value using data'''
        return data[field]

    return _reduce_this


def aggregate(*fields):
    '''Merges data based on a list of fields

    Args:
        fields(list): a list of fields that should be used as a composite key

    Returns:
       func: a function which does the aggregation
    '''

    def _merge_this(iterable):
        name, iterable = iterable
        new_map = dict(name=name, window=dict(max=None, min=None))
        for data in iterable:
            for field, value in data.iteritems():
                if field in fields:
                    new_map[field] = value
                else:
                    new_map.setdefault(field, set()).add(value)
        return new_map

    return _merge_this

# sc provided by pyspark context
combined = sc.parallelize(data).groupBy(combine('name'))
reduced = combined.map(aggregate('name'))
output = reduced.collect()

Upvotes: 0

maxymoo
maxymoo

Reputation: 36545

I think it would be something like this:

sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: {'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]}).collect()

Upvotes: 1

Related Questions