Reputation: 42957
I am pretty new in Apache Beam and I am experiencing the following problem with this simple task: I am trying to create a new .csv file staring from an .xlsx Excel file. To do this I am using Apache Beam with Python 3 language and Pandas library. I admit that are all topic pretty new to me.
I am working on Google Colab but I think that this is not so significant as information.
I imported Apache Beam and Pandas in this way (the ! is only the way to give shell command to Google Colab):
!{'pip install --quiet apache-beam pandas'}
And this is my Python code implementing the Apache Bean pipeline:
import apache_beam as beam
import pandas as pd
def parse_excel(line):
# Use the pandas library to parse the line into a DataFrame
df = pd.read_excel(line)
print("DATAFRAME")
# Convert the DataFrame to a list of dictionaries, where each dictionary represents a row in the DataFrame
# and has keys that are the column names and values that are the cell values
return [row.to_dict() for _, row in df.iterrows()]
def print_json(json_object):
# Print the JSON object
print(json_object)
def run(argv=None):
print("START run()")
p = beam.Pipeline()
# Read the Excel file as a PCollection
lines = (
p
| 'Read the Excel file' >> beam.io.ReadFromText('Pazienti_export_reduced.xlsx')
| "Convert to pandas DataFrame" >> beam.Map(lambda x: pd.DataFrame(x))
| "Write to CSV" >> beam.io.WriteToText(
'data/csvOutput', file_name_suffix=".csv", header=True
)
)
print("after lines pipeline")
# Parse the lines using the pandas library
#json_objects = lines | 'ParseExcel' >> beam.Map(parse_excel)
# Print the values of the json_objects PCollection
#json_objects | 'PrintJSON' >> beam.ParDo(print_json)
if __name__ == '__main__':
print("START main()")
print(beam.__version__)
print(pd.__version__)
run()
When I run it I obtain no error but my data folder still empty. Basically it seems that the expected csvOutput.csv output file was not created at the end of my pipeline.
What is wrong? What am I missing? How can I try to fix my code?
Upvotes: 1
Views: 948
Reputation: 5104
You are defining your pipeline, but not running it. You need to do either
with beam.Pipeline(...) as p:
...
# p.run() called on __exit__
or
p = beam.Pipeline(...)
...
p.run().wait_until_finish()
Note that beam.io.ReadFromText
does not work on xlsx files, and likewise WriteToText
does not not accept as input a PCollection of Pandas Dataframes. (It would be clearer if Python were strongly typed.) Instead, what you might want to do is something like
with beam.Pipeline(...) as p:
filenames = p | beam.Create(['Pazienti_export_reduced.xlsx', ...])
rows_as_dicts = filenames | beam.Map(filenames, parse_excel)
csv_lines = csv_lines | beam.Map(
lambda row: ','.join(str(row[c]) for c in [COL_NAMES]))
csv_lines | beam.WriteToText('out.csv', header=...)
Even easier would be to use the beam dataframes API
from apache_beam.dataframe.io import read_excel
with beam.Pipeline(...) as p:
data = p | read_excel("/path/to/*.xlsx")
data.to_csv("out.csv")
Upvotes: 1