Reputation: 31
I am trying to implement a use case following the design explained in the link below but running into error. Any pointers would be of great help.
Use Case explained step wise:
Code snippet below:
p = beam.Pipeline(argv=argv)
valid_msgs, errors = (p
| 'Read from Pubsub' >>
beam.io.ReadFromPubSub(subscription=c['SUBSCRIPTION']).with_output_types(bytes)
| 'Validate PubSub Event' >> beam.ParDo(ValidateMessages()).with_outputs('errors', main='valid')
)
filtered_events = (valid_msgs | 'Filter Events' >> beam.Filter(filter_msgs))
raw_events = (filtered_events | 'Prepare Raw Event Row for BQ ' >> beam.Map(get_raw_values))
agg_events = (filtered_events
| f'Streaming Window for {c["WINDOW_TIME"]} seconds' >> beam.WindowInto(window.FixedWindows(c['WINDOW_TIME']))
| 'Event Parser' >> beam.Map(get_agg_values)
| 'Event Aggregation' >> beam.CombinePerKey(sum)
| 'Prepare Aggregate Event Row for BQ' >> beam.Map(get_count)
)
# Raw events are written to BigQuery using 'Load Jobs' every 10 minutes.
write_result_raw = (raw_events | 'Write Raw Events to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
project=c["PROJECT"],
dataset=c["DATASET_NAME"],
method='FILE_LOADS',
triggering_frequency=10))
# Aggregated events are written to BigQuery using 'Streaming Inserts'.
write_result_agg = (agg_events | 'Write Aggregate Results to BQ' >> beam.io.WriteToBigQuery(c["COUNT_TABLE"],
project=c["PROJECT"],
dataset=c["DATASET_NAME"],
create_disposition=CreateDisposition.CREATE_NEVER,
write_disposition=WriteDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ALWAYS))
Error:
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand
42 'triggering_frequency can only be used with '
43ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery.
Based on @Iñigo response, I added the flag. But it did not work either. Please see details below.
if c['FILE_LOAD']:
argv.append('--experiments=use_beam_bq_sink')
p = beam.Pipeline(argv=argv)
records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
project=c["PROJECT"],
dataset=c["DATASET_NAME"],
method='FILE_LOADS',
triggering_frequency=c['FILE_LOAD_FREQUENCY'],
create_disposition=CreateDisposition.CREATE_NEVER,
write_disposition=WriteDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)```
Error from dataflow job.
```Workflow failed.Causes: Because of the shape of your pipeline, the Cloud Dataflow job optimizer produced a job graph that is not updatable using the - -update pipeline option.This is a known issue that we are working to resolve.See https: // issuetracker.google.com / issues / 118375066 for information about how to modify the shape of your pipeline to avoid this error.You can override this error and force the submission of the job by specifying the --experiments=allow_non_updatable_job parameter., The stateful transform named 'Write Errors to BQ/BigQueryBatchFileLoads/ImpulseSingleElementPC/Map(decode).out/FromValue/ReadStream' is in two or more computations.```
EDITS : 08/11/2020
Added both the flags mentioned as the Pipeline argument.
```INFO:root:Argument to Beam Pipeline:['--project=xxxxxx, '--runner=DataflowRunner', '--job_name=df-pubsub-raw', '--save_main_session', '--staging_location=gs:/staging/', '--temp_location=gs://temp/', '--network=dataflow-localnet', '--subnetwork=regions/us-central1/subnetworks/us-central1', '--region=us-central1', '[email protected]', '--no_use_public_ips', '--streaming', '--experiments=[allow_non_updatable_job, use_beam_bq_sink]']
INFO:root:File load enabled
INFO:root:Write using file load with frequency:5
26 File "./dataflow_ps_stream_bq.py", line 133, in stream_to_bq 27 write_disposition=WriteDisposition.WRITE_APPEND 28 File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 141, in __or__ 29 return self.pipeline.apply(ptransform, self) 30 File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 610, in apply 31 transform.transform, pvalueish, label or transform.label) 32 File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 620, in apply 33 return self.apply(transform, pvalueish) 34 File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 663, in apply 35 pvalueish_result = self.runner.apply(transform, pvalueish, self._options) 36 File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 153, in apply 37 return super(DataflowRunner, self).apply(transform, input, options) 38 File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 198, in apply 39 return m(transform, input, options) 40 File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 228, in apply_PTransform 41 return transform.expand(input) 42 File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand 43 'triggering_frequency can only be used with ' 44ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery. ```
Upvotes: 2
Views: 483
Reputation: 2680
You need to add flag --experiments use_beam_bq_sink
. This has been an issue for a while now, Dataflow overrides the load type.
You can see this in the Beam repo.
It also looks there's an on going PR to improve the the inserts, including this (I just skimmed over the code).
PS: Glad our blog post helped coming up with ideas :D
Upvotes: 1