Reputation: 699
I am trying to Create a new partition Bigquery table on runtime with following code, but i am not getting option to pass column names "_time" over which partition need to be done on my new BQ table. Can any please please help me on it.
My Code
#------------Import Lib-----------------------#
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime
#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxx'
#plitting Of Records----------------------#
class Transaction_DB_UC2(beam.DoFn):
def process(self, element):
logging.info(element)
result = json.loads(element)
data_time = result.get('_time', 'null')
data_dest = result.get('dest', 'null')
data_DBID = result.get('DBID', 'null')
data_SESSIONID = result.get('SESSIONID', 'null')
data_USERHOST = result.get('USERHOST', 'null')
data_raw = result.get('_raw', 'null')
data_ACTION = result.get('ACTION', 'null')
data_host = result.get('host', 'null')
data_result = result.get('result', 'null')
data_DBUSER = result.get('DBUSER', 'null')
data_OS_USERNAME = result.get('OS_USERNAME', 'null')
data_ACTION_NAME = result.get('ACTION', 'null').replace('100','LOGON').replace('101','LOGOFF')
return [{"_time": data_time[:-8], "dest": data_dest, "DBID": data_DBID, "SESSIONID": data_SESSIONID, "_raw": data_raw, "USERHOST": data_USERHOST, "ACTION": data_ACTION, "host": data_host, "result": data_result, "DBUSER": data_DBUSER, "OS_USERNAME": data_OS_USERNAME, "ACTION_NAME": data_ACTION_NAME}]
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Input file to process.')
parser.add_argument(
'--pro_id',
dest='pro_id',
type=str,
default='ORACLE_SEC_DEFAULT',
help='project id')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
#data_f = sys.argv[1]
logging.info('***********')
logging.info(known_args.input)
data_loading = (
p1
|'Read from File' >> beam.io.ReadFromText(known_args.input,skip_header_lines=0)
)
project_id = "xxxxx"
dataset_id = 'test123'
table_schema_DB_UC2 = ('_time:DATETIME, dest:STRING, DBID:STRING, SESSIONID:STRING, _raw:STRING, USERHOST:STRING, ACTION:STRING, host:STRING, result:STRING, DBUSER:STRING, OS_USERNAME:STRING, ACTION_NAME:STRING')
# Persist to BigQuery
# WriteToBigQuery accepts the data as list of JSON objects
#---------------------Index = DB-UC2----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| 'Clean-DB-UC2' >> beam.ParDo(Transaction_DB_UC2())
| 'Write-DB-UC2' >> beam.io.WriteToBigQuery(
table=known_args.pro_id,
dataset=dataset_id,
project=project_id,
schema=table_schema_DB_UC2,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
result = p1.run()
result.wait_until_finish()
if __name__ == '__main__':
#logging.getLogger().setLevel(logging.INFO)
path_service_account = 'ml-fbf8cabcder.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
run()
I want to create partition on filed "_time", please suggest how it can be achieved. Thanks.
Upvotes: 1
Views: 1384
Reputation: 817
I believe that you can do that with additional_bq_parameters
(Note the limitations) with the timePartitioning
parameter.
When creating a new BigQuery table, there are a number of extra parameters that one may need to specify. For example, clustering, partitioning, data encoding, etc. It is possible to provide these additional parameters by passing a Python dictionary as
additional_bq_parameters
(Reference).
In your case, you could add to your WriteToBigQuery
transform the timePartitioning
parameter with the required type
and optional field
fields (Note that field
must be a top-level TIMESTAMP or DATE field):
additional_bq_parameters={'timePartitioning': {
'type': 'DAY',
'field': '_time'
}}
I didn't have the time to try it out yet. I'll try to reproduce tomorrow.
Let me know if it works for you.
Finally got the chance to try the timePartitioning
parameter to create a partitioned table and it worked.
Here is a simple pipeline code to test it.
#!/usr/bin/env python
import apache_beam as beam
PROJECT='YOUR_PROJECT'
BUCKET='YOUR_BUCKET'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=YOUR_JOB_NAME',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=us-central1',
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=argv)
table_schema = {'fields': [
{'name': 'country', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': '_time', 'type': 'DATETIME', 'mode': 'NULLABLE'},
{'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY', 'field': '_time'}}
elements = (p | beam.Create([
{'country': 'mexico', '_time': '2020-06-10 22:19:26', 'query': 'acapulco'},
{'country': 'canada', '_time': '2020-12-11 15:42:32', 'query': 'influenza'},
]))
elements | beam.io.WriteToBigQuery(
table='YOUR_DATASET.YOUR_NEW_TABLE',
schema=table_schema,
additional_bq_parameters=additional_bq_parameters,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
p.run()
if __name__ == '__main__':
run()
Upvotes: 1