Thijs
Thijs

Reputation: 1546

importing and using google cloud packages

I've created a pipeline and it spits out a list of numbers. Those numbers flow into a ParDo and in the ParDo I query a Bigquery table using a number and then I return the query results.

This works locally. Linux, Python 3.7, google-cloud-bigquery 1.22.0

When I upload the job to dataflow things get interesting. Everything I do on the top level is not effective in the functions below. So I have to import all my used packages in every single function in order to be available.

This is so ugly that I suspect I do something wrong. But what?

So I get a function like this:

def flatten(elements):
    import datetime
    for element in elements['transactionids']:
        print('flatten: ' + str(element) + ' ' + datetime.datetime.now().isoformat())
        yield element

And I get a 'DoFn Class' like this:

class TransformTransaction(beam.DoFn):
    def setup(self):
        print("This will never run. Why?")

    def start_bundle(self):
        print("Bundle Start")
        from google.cloud import bigquery
        self.client = bigquery.Client()
        self.dataset_id = 'mydataset'
        self.table_id = 'testhijs'
        self.table_ref = self.client.dataset(self.dataset_id).table(self.table_id)
        self.table = self.client.get_table(self.table_ref)

   def retrieveTransactionData(self, transactionID):
        query = f"select transactionID, someNr from `thijs-dev.thijsset.thijstable` " \
                f"where transactionID = {transactionID}"

    query_job = self.client.query(
        query,
        location="EU",
    )  
    print(query_job)

    transactions = []

    for row in query_job:
        transactions.append(row)

    return transactions

Upvotes: 2

Views: 205

Answers (1)

manesioz
manesioz

Reputation: 837

Use the pipeline config --save_main_session. This will cause the state of the global namespace to be pickled and loaded on the Cloud Dataflow worker.

Full example in Python:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
          '--runner=DataflowRunner',
          '--project=proj',
          '--region=region',
          '--staging_location=gs://bucket/staging/',
          '--temp_location=gs://bucket/temp/',
          '--job_name=name',
          '--setup_file=./setup.py'
          ]) 

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True #this is what you need to include 

edit: link to doc

Upvotes: 1

Related Questions