Nagesh Singh Chauhan
Nagesh Singh Chauhan

Reputation: 784

input file is not getting read from pd.read_csv

I'm trying to read a file stored in google storage from apache beam using pandas but getting error

def Panda_a(self):
    import pandas as pd
    data = 'gs://tegclorox/Input/merge1.csv'
    df1 = pd.read_csv(data, names = ['first_name', 'last_name', 'age', 
         'preTestScore', 'postTestScore'])
    return df1
ip2 = p |'Split WeeklyDueto' >> beam.Map(Panda_a)
ip7 = ip2 | 'print' >> beam.io.WriteToText('gs://tegclorox/Output/merge1234')

When I'm executing the above code , the error says the path does not exist. Any idea why ?

Upvotes: 0

Views: 492

Answers (1)

jkff
jkff

Reputation: 17913

A bunch of things are wrong with this code.

  • Trying to get Pandas to read a file from Google Cloud Storage. Pandas does not support the Google Cloud Storage filesystem (as @Andrew pointed out - documentation says supported schemes are http, ftp, s3, file). However, you can use the Beam FileSystems.open() API to get a file object, and give that object to Pandas instead of the file path.
  • p | ... >> beam.Map(...) - beam.Map(f) transforms every element of the input PCollection using the given function f, it can't be applied to the pipeline itself. It seems that in your case, you want to simply run the Pandas code without any input. You can simulate that by supplying a bogus input, e.g. beam.Create(['ignored'])
  • beam.Map(f) requires f to return a single value (or more like: if it returns a list, it will interpret that list as a single value), but your code is giving it a function that returns a Pandas dataframe. I strongly doubt that you want to create a PCollection containing a single element where this element is the entire dataframe - more likely, you're looking to have 1 element for every row of the dataframe. For that, you need to use beam.FlatMap, and you need df.iterrows() or something like it.

In general, I am not sure why read the CSV file using Pandas at all. You can read it using Beam's ReadFromText with skip_header_lines=1, and then parse each line yourself - if you have a large amount of data, this will be a lot more efficient (and if you have only a small amount of data and do not anticipate it becoming large enough to exceed the capabilities of a single machine - say, if it will never be above a few GB - then Beam is the wrong tool).

Upvotes: 2

Related Questions