Reputation: 321
I am trying to process a CSV file into a dict using a Dataflow template and Python.
As it is a template I have to use ReadFromText from the textio module, to be able to provide the path at runtime.
| beam.io.ReadFromText(contact_options.path)
All I need is to be able to extract the first line of this text/csv file, I can then use this data in DictReader as the fieldnames.
If I use split lines it brings back a each element of the text file in a list:
return element.splitlines()
or
csv_data = []
split_element = element.split('\n')
for row in split_element:
csv_data.append(row)
return csv_data
['phone_number', 'cid', 'first_name', 'last_name']
[' ', '101XXXXX', 'MurXXX', 'LevXXXX']
['3052XXXXX', '109XXXXX', 'MerXXXX', 'CoXXXX']
['954XXXXX', '10XXXXXX', 'RoXXXX', 'MaXXXXX']
Although If I then use say element[0], it just brings everythin back without the list brackets. I have also tried splitting by '\n', then using a for loop to produce a list object, although it produces almost the same result.
I cannot rely on using predetermined fieldnames as the csv files to be processed will all have different fieldnames and DictReader will not work effectively without fieldnames given.
EDIT:
The expected output is:
[{'phone_Number': '561XXXXX', 'first_Name': '', 'last_Name': 'BeXXXX', 'cid': '745XXXXX'}, {'phone_Number': '561XXXXX', 'first_Name': 'A', 'last_Name': 'BXXXX', 'cid': '61XXXXX'}]
EDIT:
Element contents:
"phone_Number","cid","first_Name","last_Name"
"5616XXXXX","745XXXX","","BeXXXXX"
"561XXXXXX","61XXXXX","A","BXXXXXXt"
"95XXXXXXX","6XXXXXX","A","BXXXXXX"
"727XXXXXX","98XXXXXX","A","CaXXXXXX"
Upvotes: 0
Views: 595
Reputation: 321
I was able to figure this problem out with inspiration from @mad_'s answer, but this still didn't give me the correct answer initally, as I needed to first group my pcollection into one element. I found a way of doing this inspired from this answer from Jiayuan Ma, and slightly altered it as so:
class Group(beam.DoFn):
def __init__(self):
self._buffer = []
def process(self, element):
self._buffer.append(element)
def finish_bundle(self):
if len(self._buffer) != 0:
yield list(self._buffer)
self._buffer = []
lines = p | 'File reading' >> ReadFromText(known_args.input)
| 'Group' >> beam.ParDo(Group(known_args.N)
...
Thus it grouped the entire CSV file as one object, and then I was able to apply mad_'s method to turn it into a dictionary.
Upvotes: 1
Reputation: 8273
Use Pandas to load the values and use first line as colheaders
import pandas as pd
a_big_list=[['phone_number', 'cid', 'first_name', 'last_name'],
[' ', '101XXXXX', 'MurXXX', 'LevXXXX'],
['3052XXXXX', '109XXXXX', 'MerXXXX', 'CoXXXX'],
['954XXXXX', '10XXXXXX', 'RoXXXX', 'MaXXXXX']]
df=pd.DataFrame(a_big_list[1:],columns=a_big_list[0])
df.to_dict('records')
#[{'cid': '101XXXXX',
'first_name': 'MurXXX',
'last_name': 'LevXXXX',
'phone_number': ' '},
{'cid': '109XXXXX',
'first_name': 'MerXXXX',
'last_name': 'CoXXXX',
'phone_number': '3052XXXXX'},
{'cid': '10XXXXXX',
'first_name': 'RoXXXX',
'last_name': 'MaXXXXX',
'phone_number': '954XXXXX'}]
Upvotes: 1