Reputation: 321
I am trying to convert a JSON file into a dictionary and apply key/value pairs, so I can then use groupbykey() to basically deduplicate the key/value pairs.
This is the original content of the file:
{"tax_pd":"200003","ein":"720378282"}
{"tax_pd":"200012","ein":"274027765"}
{"tax_pd":"200012","ein":"042746989"}
{"tax_pd":"200012","ein":"205993971"}
I have formatted it like so:
(u'201208', u'010620100')
(u'201208', u'860785769')
(u'201208', u'371650138')
(u'201208', u'237253410')
I want to turn these into key/value pairs, so I can apply GroupByKey, in my Dataflow Pipeline. I believe i need to turn it into a dictionary first?
I'm new to python and the google cloud applications and some help would be great!
EDIT : Code snippets
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadInputText' >> beam.io.ReadFromText(known_args.input)
| 'YieldWords' >> beam.ParDo(ExtractWordsFn())
# | 'GroupByKey' >> beam.GroupByKey()
| 'WriteInputText' >> beam.io.WriteToText(known_args.output))
class ExtractWordsFn(beam.DoFn):
def process(self, element):
words = re.findall(r'[0-9]+', element)
yield tuple(words)
Upvotes: 1
Views: 1044
Reputation: 13106
A quick pure-Python solution would be:
import json
with open('path/to/my/file.json','rb') as fh:
lines = [json.loads(l) for l in fh.readlines()]
# [{'tax_pd': '200003', 'ein': '720378282'}, {'tax_pd': '200012', 'ein': '274027765'}, {'tax_pd': '200012', 'ein': '042746989'}, {'tax_pd': '200012', 'ein': '205993971'}]
Looking at your data, you don't have unique keys to do key:value by tax_pd
and ein
. Assuming there will be collisions, you could do the following:
myresults = {}
for line in lines:
# I'm assuming we want to use tax_pd as the key, and ein as the value, but this can be extended to other keys
# This will return None if the tax_pd is not already found
if not myresults.get(line.get('tax_pd')):
myresults[line.get('tax_pd')] = [line.get('ein')]
else:
myresults[line.get('tax_pd')] = list(set([line.get('ein'), *myresults[line.get('tax_pd')]))
#results
#{'200003': ['720378282'], '200012': ['205993971', '042746989', '274027765']}
This way you have unique keys, with lists of corresponding unique ein
values. Not completely sure if that's what you're going for or not. set
will automatically dedup a list, and the wrapping list
reconverts the data type
You can then lookup by the tax_id
explicitly:
myresults.get('200012')
# ['205993971', '042746989', '274027765']
EDIT: To read from the cloud storage, the code snippet here translated to be a bit easier to use:
with gcs.open(filename) as fh:
lines = fh.read().split('\n')
You can set up your gcs object using their api docs
Upvotes: 2