Reputation: 11
I am doing POC for GCP Dataflow batch processing.
I want to pass Pandas Dataframe as batch input and perform columnar transformation and return same batch again.
I refer example provided in below link for MultipyByTwo https://beam.apache.org/documentation/programming-guide/
when I input Pandas Dataframe process_batch function is not executing.
Can you please let me know why and if possible please provide me example.
Code - pd = read_excel(path)
result1 = ( pd | "test batch" >> beam.ParDo(TestBatch(argv)))
DoFn Class -
class (TestBatch(beam.DoFn): def init(self, args: Any): self.args = args print("(TestBatch.init")
def setup(self) -> None:
print("(TestBatch.setup")
def process_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
print("(TestBatch.process_batch")
print(batch)
yield batch
Upvotes: 0
Views: 263
Reputation: 407
I suggest you take a look at Beam Dataframes, e.g., https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_excel.This allows you to process the dataframes more efficiently.
For your case, this code example should work:
import apache_beam as beam
import pandas as pd
df = pd.read_csv("beers.csv")
class ProcessDf(beam.DoFn):
def process(self, e: pd.DataFrame):
print(e)
return e
with beam.Pipeline() as p:
p | beam.Create([df]) | beam.ParDo(ProcessDf())
The Beam pipeline needs to start with a Pipeline.
https://beam.apache.org/get-started/ has more information. Tour of Beam is a great place to play with some basic concepts.
Upvotes: 0