jmoore255
jmoore255

Reputation: 321

python list to dictionary for dataflow

I am trying to convert a JSON file into a dictionary and apply key/value pairs, so I can then use groupbykey() to basically deduplicate the key/value pairs.

This is the original content of the file:

{"tax_pd":"200003","ein":"720378282"} {"tax_pd":"200012","ein":"274027765"} {"tax_pd":"200012","ein":"042746989"} {"tax_pd":"200012","ein":"205993971"}

I have formatted it like so:

(u'201208', u'010620100') (u'201208', u'860785769') (u'201208', u'371650138') (u'201208', u'237253410')

I want to turn these into key/value pairs, so I can apply GroupByKey, in my Dataflow Pipeline. I believe i need to turn it into a dictionary first?

I'm new to python and the google cloud applications and some help would be great!

EDIT : Code snippets

with beam.Pipeline(options=pipeline_options) as p: (p | 'ReadInputText' >> beam.io.ReadFromText(known_args.input) | 'YieldWords' >> beam.ParDo(ExtractWordsFn()) # | 'GroupByKey' >> beam.GroupByKey() | 'WriteInputText' >> beam.io.WriteToText(known_args.output))

class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[0-9]+', element) yield tuple(words)

Upvotes: 1

Views: 1044

Answers (1)

C.Nivs
C.Nivs

Reputation: 13106

A quick pure-Python solution would be:

import json

with open('path/to/my/file.json','rb') as fh:
    lines = [json.loads(l) for l in fh.readlines()]

# [{'tax_pd': '200003', 'ein': '720378282'}, {'tax_pd': '200012', 'ein': '274027765'}, {'tax_pd': '200012', 'ein': '042746989'}, {'tax_pd': '200012', 'ein': '205993971'}]

Looking at your data, you don't have unique keys to do key:value by tax_pd and ein. Assuming there will be collisions, you could do the following:

myresults = {}

for line in lines:
    # I'm assuming we want to use tax_pd as the key, and ein as the value, but this can be extended to other keys

    # This will return None if the tax_pd is not already found
    if not myresults.get(line.get('tax_pd')):
        myresults[line.get('tax_pd')] = [line.get('ein')]
    else:
        myresults[line.get('tax_pd')] = list(set([line.get('ein'), *myresults[line.get('tax_pd')]))

#results
#{'200003': ['720378282'], '200012': ['205993971', '042746989', '274027765']}

This way you have unique keys, with lists of corresponding unique ein values. Not completely sure if that's what you're going for or not. set will automatically dedup a list, and the wrapping list reconverts the data type

You can then lookup by the tax_id explicitly:

myresults.get('200012')
# ['205993971', '042746989', '274027765']

EDIT: To read from the cloud storage, the code snippet here translated to be a bit easier to use:

with gcs.open(filename) as fh:
    lines = fh.read().split('\n')

You can set up your gcs object using their api docs

Upvotes: 2

Related Questions