Reputation: 1552
I have a CSV file which has 2 columns named first_name and last_name.
I am using dataflow with direct_runner.
My use case to first change the column names to name and surname and then use PTransform to concat name and surname and generate a new column as employee_name
code :
import apache_beam as beam
p2= beam.Pipeline()
def splitrow(element):
return element.split(',')
demodata0=(
p2
|beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
|beam.Map(splitrow)
|beam.Map(lambda element : ( element[0]+" "+element[1]))
|beam.io.WriteToText('gs://demo/temp/output2')
)
p2.run()
input table :
first_name last_name
John Miller
Smith scott
output table :
name surname employee_name
john Miller John Miller
Smith Scott smith Scott
Thanks
Upvotes: 0
Views: 833
Reputation: 31
Creating your own DoFn is great when you have complex logic and need to do some heavy lifting. If you just need to select some columns names and have relatively simple definitions as is the case here, you can use beam.Select() for creating schemas.
# Copyright 2022 Google LLC.
# SPDX-License-Identifier: Apache-2.0
import apache_beam as beam
p2= beam.Pipeline()
def splitrow(element):
return element.split(',')
demodata0=(
p2
|beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
|beam.Map(splitrow)
|beam.Select(name=lambda element: element[0],
surname=lambda element: element[1],
full_name=lambda element: element[0]+" "+element[1])
|beam.io.WriteToText('gs://demo/temp/output2')
)
p2.run()
Upvotes: 1
Reputation: 1166
I have never worked with CSV files in beam before, but I would suggest using a custom DoFn (see here). It would look something like this:
class EnrichCsvData(beam.DoFn):
def process(self, element):
output_pcoll = {}
# i don't know if the inputs are strings, you might need to adjust the code if not
output_pcoll["name"] = element[0]
output_pcoll["surname"] = element[1]
output_pcoll["employee_name"] = element[0] + element[1]
return output_pcoll
and then call it in your pipeline:
p2
|beam.io.ReadFromText('gs://demo/MOCK_DATA.csv')
|beam.Map(splitrow)
|beam.ParDo(EnrichCsvData())
|...
Upvotes: 1