Reputation: 699
I have a simple google could http trigger function which is responsible for triggering Dataflow runner job that loads data from CSV on Cloud Storage to a BigQuery table.
My code looks is given below:-
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import SetupOptions, PipelineOptions
PROJECT = 'proj'
BUCKET='BUCKET'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT,ibu:STRING,brewery_id:STRING'
DATAFLOW_JOB_NAME = 'jobname'
def execute(request):
argv = [
'--project={0}'.format(PROJECT),
'--job_name={0}'.format(DATAFLOW_JOB_NAME),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=europe-west2',
'--runner=DataflowRunner'
]
#p = beam.Pipeline(argv=argv)
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
input = 'gs://{0}/beers.csv'.format(BUCKET)
print ('step-222')
(p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
| 'SplitData' >> beam.Map(lambda x: x.split(','))
| 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='data',
dataset='sandbox',
project=PROJECT
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
p.run()
return "success"
Function runs successfully and it also creates a Dataflow instance, but Dataflow instance fails with in 40 seconds without creating Graph-View.
It is giving error :-
Upvotes: 0
Views: 106
Reputation: 6582
As @captainnabla said in his comment, you have to create a subnetwork and give it as option to your Dataflow
job.
In the default VPC
of the project, create the subnetwork for Dataflow
If you didn’t specified the subnetwork, usually the project default VPC network will be used by the Dataflow job. I don’t know why this didn’t worked in your case (maybe in this case, the default network taken by the job is outside of the project executing the job).
Create another VPC
for your data pipelines and a subnetwork for Dataflow
The network config depends on your team strategy.
In the two solutions, you can pass the subnetwork as program argument to your Dataflow
job :
--subnetwork=https://www.googleapis.com/compute/v1/projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{SUBNETWORK}
Upvotes: 3