Sruthi Satish
Sruthi Satish

Reputation: 23

How to split the result of GroupByKey() transform based on key and write values into GCS bucket using Apache Beam python?

I am new to Apache Beam, Dataflow and Python and any help would be appreciated. I have a requirement where I need to generate reports by fetching records from BigQuery table and writing the results into GCS bucket using Apache Beam in python.I wrote the pipeline as below -

#Here I am converting the BigQuery output to 2 element tuple where elements are dictionaries for ex : 
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : '[email protected]','phone_number': '00012345'})

class convtotupleofdict(beam.DoFn):
     def process(self,element):
         return[( {'institiution_id' : element['institiution_id'] },
                {'customer_id':element['customer_id'],
                  'customer_name' :  element['customer_name'],
                  'customer_email' : element['customer_email'],
                  'phone_number' : element['phone_number']})]

with beam.Pipeline(options=pipeline_options) as p:
   csv_ip=    p | 'ReadfromBQ' >> beam.io.ReadFromBigQuery(query='SELECT institiution_id,customer_id,customer_name,customer_email,phone_number from <table name> where customer_status='Active' order by
    institiution_id,customer_id', use_standard_sql=True) \
                 | 'ConvttoTupleofDict' >> beam.ParDo(convtotupleofdict()) \
                 | 'Groupbyinstitution_id' >> beam.GroupByKey() \
    
    op_gcs= csv_ip 
                   | 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
                                  path='gs://my-bucket/reports',
                                  sink=lambda dest :beam.io.fileio.TextSink()
                                  
                                  )

I am using GroupbyKey() transform to group the data based on institution_id so that I can split the data based on institution_id and create separate files for each institution_id in GCS bucket. The GroupbyKey() output is as follows-

({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : '[email protected]','phone_number': '00012345'},{'customer_id' : '2000','customer_name': 'XYZ','customer_email' : '[email protected]','phone_number': '12378'})
({'institution_id' :'200'},{'customer_id' : '3000','customer_name': 'MNO','customer_email' : '[email protected]','phone_number': '789102'},{'customer_id' : '4000','customer_name': 'PQR','customer_email' : '[email protected]','phone_number': '123789'})

Now,I am struggling to convert the GroupbyKey() output to csv file to upload to GCS bucket.I got to know about beam.io.fileio.WriteToFiles from https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html as this can be used for writing files to dynamic destinations. In order to split the data by institution_id,how should I provide the following parameters of WritetoFiles - path,destination,sink and file_naming. I understand destination and sink are callables,but I am not able to build it.I am kind of stuck at this point and not able to proceed.I am actually getting confused between the params destination and sink ,how should I write it to split the data based on institution_id and generate csv file ? For now,I am testing my code with DirectRunner.

Upvotes: 2

Views: 943

Answers (2)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I hope it can help.

I propose you a complete solution for your need.

Beam main.py file :

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from your_root_folder.file_io_customer_by_institution_transform import \
    FileIOCustomerByInstitutionTransform


def run():
    pipeline_options = PipelineOptions()

    with beam.Pipeline(options=pipeline_options) as p:
        input = [
            {
                'institution_id': '100',
                'customer_id': '1000',
                'customer_name': 'ABC',
                'customer_email': '[email protected]',
                'phone_number': '00012345'
            },
            {
                'institution_id': '100',
                'customer_id': '1001',
                'customer_name': 'ABCD',
                'customer_email': '[email protected]',
                'phone_number': '00012346'
            },
            {
                'institution_id': '101',
                'customer_id': '1001',
                'customer_name': 'ABCD',
                'customer_email': '[email protected]',
                'phone_number': '00012346'
            }
        ]

        (
                p
                | beam.Create(input)
                | "Group customers by institution" >> beam.GroupBy(lambda customers: customers['institution_id'])
                | f"Write file to GCS" >> FileIOCustomerByInstitutionTransform('gs://mazlum_dev/dynamicfiles')
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

job_config file :

class JobConfig:
    FILE_ENCODING = 'utf-8'
    FILE_NAME_TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
    CSV_SEPARATOR = ','

file_io_customer_by_institution_transform.py file :

from datetime import datetime
from typing import List, Dict, Tuple, Iterable

import apache_beam as beam
from apache_beam.io.fileio import TextSink, WriteToFiles
from pytz import timezone

from integration_ocd.pythonjobs.common.module.job_config import JobConfig


class InstitutionDestinationParamError(Exception):
    pass


class FileIOCustomerByInstitutionTransform(beam.PTransform):

    def __init__(self, output_path: str):
        super().__init__()
        self._output_path = output_path

    def expand(self, pcoll):
        return (
                pcoll
                | f"Write files to GCS" >>
                WriteToFiles(
                    path=self._output_path,
                    destination=to_institution_destination,
                    file_naming=self.build_file_name,
                    sink=lambda institution_dest: CustomCsvSink())
        )

    def build_file_name(self, *args) -> str:
        """
        Build the file name dynamically from parameters given by Beam in the 'writeToFile' PTransform
        A destination is built with institution as value (key of group in the PCollection), then the file name
        is built from this institution (5th argument)
        """
        file_name_timestamp = datetime.now(timezone('Europe/Paris')).strftime(JobConfig.FILE_NAME_TIMESTAMP_FORMAT)
        try:
            institution_destination: str = args[5]

            return f'CUSTOMER_INSTITUTION_{institution_destination}_{file_name_timestamp}.csv'
        except Exception as err:
            raise InstitutionDestinationParamError('The institution destination param must be passed', err)


class CustomCsvSink(TextSink):

    def __init__(self):
        super().__init__()

    def write(self, customers_with_institution):
        customers: Iterable[Dict[str, str]] = customers_with_institution[1]

        for index, customer in enumerate(customers, start=1):
            if index == 1:
                header_field_names: bytes = self.build_csv_header_file(customer)
                self._fh.write(header_field_names)
                self._fh.write(self.get_csv_line_break())

            customer_csv_entry = self.convert_dict_to_csv_record(customer)

            self._fh.write(customer_csv_entry)
            self._fh.write(self.get_csv_line_break())

    def get_csv_line_break(self) -> bytes:
        return '\n'.encode(JobConfig.FILE_ENCODING)

    def build_csv_header_file(self, customer_dict: Dict[str, str]) -> bytes:
        header_field_names: str = JobConfig.CSV_SEPARATOR.join(customer_dict.keys())

        return header_field_names.encode(JobConfig.FILE_ENCODING)

    def convert_dict_to_csv_record(self, customer_dict: Dict[str, str]) -> bytes:
        """
        Turns dictionary values into a comma-separated value formatted string
        The separator is added to a configuration file
        """
        customer_csv_record: str = JobConfig.CSV_SEPARATOR.join(map(str, customer_dict.values()))

        return customer_csv_record.encode(JobConfig.FILE_ENCODING)


def to_institution_destination(customers_with_institution: Tuple[str, List[Dict[str, str]]]) -> str:
    """
    Map the given tuple to the institution as destination in 'WriteToFiles' PTransform.
    Then this destination can be used in the 'file_name' part.
    """
    return customers_with_institution[0]

Some explanations :

  • The input data is mocked in my example with customers data
  • The first operation in the main file, is a group by with Beam on institution_id field
  • For step after, I created a separated file with a PTransform containing all the logic to write dynamic files

file_io_customer_by_institution_transform.py file :

  • This file use a little configuration with JobConfig object
  • A CSVSink is created to generate the CSV lines from elements in the PCollection
  • The filename is generated by the current institution ID and the current timestamp
  • A file is generated per elements grouped by institution ID

In my example :

  • file 1 with institution 100 : CUSTOMER_INSTITUTION_100_20221011160828.csv => contains 2 CSV lines

  • file 2 with institution 101 : CUSTOMER_INSTITUTION_101_20221011160828.csv => contains 1 CSV line

To be honnest the documentation is not complete in Beam Python for this kind of use case and the use of WriteToFiles.

Upvotes: 3

Bruno Volpato
Bruno Volpato

Reputation: 1428

I think TextIO.WriteToText will more friendly and capable of outputting CSVs by passing very few parameters (it is an abstraction on top of FileIO).

Here is what I suggest doing:

csv_ip  | 'WritetoGCS' >> beam.io.WriteToText(
    'gs://{0}/reports'.format(BUCKET), file_name_suffix='.csv')

Note: I had missed the part that you want to use a dynamic file destination. If that is a requirement, I suggest you checking Apache Beam DynamicDestinations Python equivalent.

Based on that example, it would be something along these lines:

csv_ip | 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
      path='gs://{0}/reports/'.format(BUCKET),
      destination=lambda record: record['institution_id'],
      sink=lambda dest: CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

Upvotes: 0

Related Questions