Sandeep Mohanty
Sandeep Mohanty

Reputation: 1552

Renaming Column names and creating new column names using apache beam

I have a CSV file which has 2 columns named first_name and last_name.

I am using dataflow with direct_runner.

My use case to first change the column names to name and surname and then use PTransform to concat name and surname and generate a new column as employee_name

code :

import apache_beam as beam

p2= beam.Pipeline()

def splitrow(element):
  return element.split(',')

demodata0=(
    
    p2
      |beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
      |beam.Map(splitrow)
      |beam.Map(lambda element : ( element[0]+" "+element[1]))
      |beam.io.WriteToText('gs://demo/temp/output2')

)

p2.run()

input table :

first_name      last_name
John             Miller
Smith            scott

output table :

name   surname   employee_name
john    Miller    John Miller
Smith   Scott     smith Scott

Thanks

Upvotes: 0

Views: 833

Answers (2)

alift-advantage
alift-advantage

Reputation: 31

Creating your own DoFn is great when you have complex logic and need to do some heavy lifting. If you just need to select some columns names and have relatively simple definitions as is the case here, you can use beam.Select() for creating schemas.

# Copyright 2022 Google LLC. 
# SPDX-License-Identifier: Apache-2.0

import apache_beam as beam

p2= beam.Pipeline()

def splitrow(element):
  return element.split(',')

demodata0=(
    
    p2
      |beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
      |beam.Map(splitrow)
      |beam.Select(name=lambda element: element[0],
                   surname=lambda element: element[1],
                   full_name=lambda element: element[0]+" "+element[1])
      |beam.io.WriteToText('gs://demo/temp/output2')

)

p2.run()

Upvotes: 1

CaptainNabla
CaptainNabla

Reputation: 1166

I have never worked with CSV files in beam before, but I would suggest using a custom DoFn (see here). It would look something like this:

class EnrichCsvData(beam.DoFn):
  def process(self, element):
    output_pcoll = {}
    # i don't know if the inputs are strings, you might need to adjust the code if not
    output_pcoll["name"] = element[0]
    output_pcoll["surname"] = element[1]
    output_pcoll["employee_name"] = element[0] + element[1]
    
    return output_pcoll

and then call it in your pipeline:

p2
  |beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
  |beam.Map(splitrow)
  |beam.ParDo(EnrichCsvData())
  |...

Upvotes: 1

Related Questions