Vibhor Gupta
Vibhor Gupta

Reputation: 699

How to Create Partition table at runtime over Apache Beam using Python

I am trying to Create a new partition Bigquery table on runtime with following code, but i am not getting option to pass column names "_time" over which partition need to be done on my new BQ table. Can any please please help me on it.

My Code

#------------Import Lib-----------------------#
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxx'

#plitting Of Records----------------------#

class Transaction_DB_UC2(beam.DoFn):
    def process(self, element):
        logging.info(element)

        result = json.loads(element)

        data_time = result.get('_time', 'null')
        data_dest = result.get('dest', 'null')
        data_DBID = result.get('DBID', 'null')
        data_SESSIONID = result.get('SESSIONID', 'null')
        data_USERHOST = result.get('USERHOST', 'null')
        data_raw = result.get('_raw', 'null')
        data_ACTION = result.get('ACTION', 'null')
        data_host = result.get('host', 'null')
        data_result = result.get('result', 'null')
        data_DBUSER = result.get('DBUSER', 'null')
        data_OS_USERNAME = result.get('OS_USERNAME', 'null')
        data_ACTION_NAME = result.get('ACTION', 'null').replace('100','LOGON').replace('101','LOGOFF')

        return [{"_time": data_time[:-8], "dest": data_dest,  "DBID": data_DBID,  "SESSIONID": data_SESSIONID,  "_raw": data_raw,  "USERHOST": data_USERHOST,  "ACTION": data_ACTION, "host": data_host, "result": data_result, "DBUSER": data_DBUSER, "OS_USERNAME": data_OS_USERNAME, "ACTION_NAME": data_ACTION_NAME}]

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
          '--input',
          dest='input',
          help='Input file to process.')
    parser.add_argument(
        '--pro_id',
        dest='pro_id',
        type=str,
        default='ORACLE_SEC_DEFAULT',
        help='project id')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)

    #data_f = sys.argv[1]
    logging.info('***********')
    logging.info(known_args.input)
    data_loading = (
        p1
        |'Read from File' >> beam.io.ReadFromText(known_args.input,skip_header_lines=0)
    )


    project_id = "xxxxx"
    dataset_id = 'test123'
    table_schema_DB_UC2 = ('_time:DATETIME, dest:STRING, DBID:STRING, SESSIONID:STRING, _raw:STRING, USERHOST:STRING, ACTION:STRING, host:STRING, result:STRING, DBUSER:STRING, OS_USERNAME:STRING, ACTION_NAME:STRING')

        # Persist to BigQuery
        # WriteToBigQuery accepts the data as list of JSON objects

#---------------------Index = DB-UC2----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Clean-DB-UC2' >> beam.ParDo(Transaction_DB_UC2())
        | 'Write-DB-UC2' >> beam.io.WriteToBigQuery(
                                                    table=known_args.pro_id,
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_DB_UC2,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  #logging.getLogger().setLevel(logging.INFO)
  path_service_account = 'ml-fbf8cabcder.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()

I want to create partition on filed "_time", please suggest how it can be achieved. Thanks.

Upvotes: 1

Views: 1384

Answers (1)

Ksign
Ksign

Reputation: 817

I believe that you can do that with additional_bq_parameters (Note the limitations) with the timePartitioning parameter.

When creating a new BigQuery table, there are a number of extra parameters that one may need to specify. For example, clustering, partitioning, data encoding, etc. It is possible to provide these additional parameters by passing a Python dictionary as additional_bq_parameters (Reference).

In your case, you could add to your WriteToBigQuery transform the timePartitioning parameter with the required type and optional field fields (Note that field must be a top-level TIMESTAMP or DATE field):

additional_bq_parameters={'timePartitioning': {
        'type': 'DAY',
        'field': '_time'
    }}

I didn't have the time to try it out yet. I'll try to reproduce tomorrow.
Let me know if it works for you.

EDIT

Finally got the chance to try the timePartitioning parameter to create a partitioned table and it worked.
Here is a simple pipeline code to test it.

#!/usr/bin/env python

import apache_beam as beam

PROJECT='YOUR_PROJECT'
BUCKET='YOUR_BUCKET'

def run():
    argv = [
          '--project={0}'.format(PROJECT),
          '--job_name=YOUR_JOB_NAME',
          '--save_main_session',
          '--staging_location=gs://{0}/staging/'.format(BUCKET),
          '--temp_location=gs://{0}/staging/'.format(BUCKET),
          '--region=us-central1',
          '--runner=DataflowRunner'
    ]

    p = beam.Pipeline(argv=argv)

    table_schema = {'fields': [
        {'name': 'country', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': '_time', 'type': 'DATETIME', 'mode': 'NULLABLE'},
        {'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}

    additional_bq_parameters = {
            'timePartitioning': {'type': 'DAY', 'field': '_time'}}

    elements = (p | beam.Create([
            {'country': 'mexico', '_time': '2020-06-10 22:19:26', 'query': 'acapulco'},
            {'country': 'canada', '_time': '2020-12-11 15:42:32', 'query': 'influenza'},
        ]))

    elements | beam.io.WriteToBigQuery(
        table='YOUR_DATASET.YOUR_NEW_TABLE',
        schema=table_schema,
        additional_bq_parameters=additional_bq_parameters,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
    p.run()

if __name__ == '__main__':
    run()

Upvotes: 1

Related Questions