AndreaNobili
AndreaNobili

Reputation: 42957

Why this Apache Beam pipeline reading an Excel file and creating a .CSV from it is not working?

I am pretty new in Apache Beam and I am experiencing the following problem with this simple task: I am trying to create a new .csv file staring from an .xlsx Excel file. To do this I am using Apache Beam with Python 3 language and Pandas library. I admit that are all topic pretty new to me.

I am working on Google Colab but I think that this is not so significant as information.

I imported Apache Beam and Pandas in this way (the ! is only the way to give shell command to Google Colab):

!{'pip install --quiet apache-beam pandas'}

And this is my Python code implementing the Apache Bean pipeline:

import apache_beam as beam
import pandas as pd

def parse_excel(line):
  # Use the pandas library to parse the line into a DataFrame
  df = pd.read_excel(line)
  print("DATAFRAME")

  # Convert the DataFrame to a list of dictionaries, where each dictionary represents a row in the DataFrame
  # and has keys that are the column names and values that are the cell values
  return [row.to_dict() for _, row in df.iterrows()]

def print_json(json_object):
  # Print the JSON object
  print(json_object)

def run(argv=None):
  print("START run()")
  p = beam.Pipeline()

  # Read the Excel file as a PCollection
  lines = (
             p 
             | 'Read the Excel file' >> beam.io.ReadFromText('Pazienti_export_reduced.xlsx')
             | "Convert to pandas DataFrame" >> beam.Map(lambda x: pd.DataFrame(x))
             | "Write to CSV" >> beam.io.WriteToText(
                'data/csvOutput', file_name_suffix=".csv", header=True
            )
          )
  
  print("after lines pipeline")

  # Parse the lines using the pandas library
  #json_objects = lines | 'ParseExcel' >> beam.Map(parse_excel)

  # Print the values of the json_objects PCollection
  #json_objects | 'PrintJSON' >> beam.ParDo(print_json)
  

if __name__ == '__main__':
  print("START main()")
  print(beam.__version__)
  print(pd.__version__)
  run()

When I run it I obtain no error but my data folder still empty. Basically it seems that the expected csvOutput.csv output file was not created at the end of my pipeline.

What is wrong? What am I missing? How can I try to fix my code?

Upvotes: 1

Views: 948

Answers (1)

robertwb
robertwb

Reputation: 5104

You are defining your pipeline, but not running it. You need to do either

with beam.Pipeline(...) as p:
  ...
  # p.run() called on __exit__

or

p = beam.Pipeline(...)
...
p.run().wait_until_finish()

Note that beam.io.ReadFromText does not work on xlsx files, and likewise WriteToText does not not accept as input a PCollection of Pandas Dataframes. (It would be clearer if Python were strongly typed.) Instead, what you might want to do is something like

with beam.Pipeline(...) as p:
  filenames = p | beam.Create(['Pazienti_export_reduced.xlsx', ...])
  rows_as_dicts = filenames | beam.Map(filenames, parse_excel)
  csv_lines = csv_lines | beam.Map(
      lambda row: ','.join(str(row[c]) for c in [COL_NAMES]))
  csv_lines | beam.WriteToText('out.csv', header=...)

Even easier would be to use the beam dataframes API

from apache_beam.dataframe.io import read_excel

with beam.Pipeline(...) as p:
  data = p | read_excel("/path/to/*.xlsx")
  data.to_csv("out.csv")

Upvotes: 1

Related Questions