PramodK
PramodK

Reputation: 11

GCP Dataflow batch processing

I am doing POC for GCP Dataflow batch processing.

I want to pass Pandas Dataframe as batch input and perform columnar transformation and return same batch again.

I refer example provided in below link for MultipyByTwo https://beam.apache.org/documentation/programming-guide/

when I input Pandas Dataframe process_batch function is not executing.

Can you please let me know why and if possible please provide me example.

Code - pd = read_excel(path)

result1 = ( pd | "test batch" >> beam.ParDo(TestBatch(argv)))

DoFn Class -

class (TestBatch(beam.DoFn): def init(self, args: Any): self.args = args print("(TestBatch.init")

def setup(self) -> None:
    print("(TestBatch.setup")

def process_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
    print("(TestBatch.process_batch")

    print(batch)

    yield batch

Upvotes: 0

Views: 263

Answers (1)

XQ Hu
XQ Hu

Reputation: 407

I suggest you take a look at Beam Dataframes, e.g., https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_excel.This allows you to process the dataframes more efficiently.

For your case, this code example should work:

import apache_beam as beam
import pandas as pd

df = pd.read_csv("beers.csv")


class ProcessDf(beam.DoFn):
    def process(self, e: pd.DataFrame):
        print(e)
        return e


with beam.Pipeline() as p:
    p | beam.Create([df]) | beam.ParDo(ProcessDf())

The Beam pipeline needs to start with a Pipeline.

https://beam.apache.org/get-started/ has more information. Tour of Beam is a great place to play with some basic concepts.

Upvotes: 0

Related Questions