user2448446
user2448446

Reputation: 31

Dataflow Job with mix workload - Streaming Insert & Load Jobs (Python)

I am trying to implement a use case following the design explained in the link below but running into error. Any pointers would be of great help.

https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow

Use Case explained step wise:

  1. Get streaming raw events from PubSub.
  2. Validate the raw event received.
  3. Filter specific type of events.
  4. Create dictionary of the filtered events.
  5. Same time, pass the filtered events through windowing operation and aggregate it.
  6. 2 types of output - Raw Events Dictionary, Aggregated Events Dictionary.
  7. Following the design explained in the link above - raw events dictionary fall into LowUrgency category and aggregated events fall into HighUrgency category.
  8. Trying 'FILE_LOADS' method for raw events to avoid cost part.
  9. Trying 'STREAMING_INSERT' method for aggregated event as it needs to be available in real-time.

Code snippet below:

p = beam.Pipeline(argv=argv)
    valid_msgs, errors = (p
                          | 'Read from Pubsub' >>
                          beam.io.ReadFromPubSub(subscription=c['SUBSCRIPTION']).with_output_types(bytes)
                          | 'Validate PubSub Event' >> beam.ParDo(ValidateMessages()).with_outputs('errors', main='valid')
                          )

    filtered_events = (valid_msgs | 'Filter Events' >> beam.Filter(filter_msgs))

    raw_events = (filtered_events | 'Prepare Raw Event Row for BQ ' >> beam.Map(get_raw_values))

    agg_events = (filtered_events
                  | f'Streaming Window for {c["WINDOW_TIME"]} seconds' >> beam.WindowInto(window.FixedWindows(c['WINDOW_TIME']))
                  | 'Event Parser' >> beam.Map(get_agg_values)
                  | 'Event Aggregation' >> beam.CombinePerKey(sum)
                  | 'Prepare Aggregate Event Row for BQ' >> beam.Map(get_count)
                  )

    # Raw events are written to BigQuery using 'Load Jobs' every 10 minutes.
    write_result_raw = (raw_events | 'Write Raw Events to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                                         project=c["PROJECT"],
                                                                                         dataset=c["DATASET_NAME"],
                                                                                         method='FILE_LOADS',
                                                                                         triggering_frequency=10))

    # Aggregated events are written to BigQuery using 'Streaming Inserts'.
    write_result_agg = (agg_events | 'Write Aggregate Results to BQ' >> beam.io.WriteToBigQuery(c["COUNT_TABLE"],
                                                                                                project=c["PROJECT"],
                                                                                                dataset=c["DATASET_NAME"],
                                                                                                create_disposition=CreateDisposition.CREATE_NEVER,
                                                                                                write_disposition=WriteDisposition.WRITE_APPEND,
                                                                                                insert_retry_strategy=RetryStrategy.RETRY_ALWAYS))

Error:

File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand
42    'triggering_frequency can only be used with '
43ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery.

Based on @Iñigo response, I added the flag. But it did not work either. Please see details below.

 if c['FILE_LOAD']:
        argv.append('--experiments=use_beam_bq_sink')

 p = beam.Pipeline(argv=argv)
 records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  method='FILE_LOADS',
                                                                  triggering_frequency=c['FILE_LOAD_FREQUENCY'],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND,
                                                                  insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                                                  )```

Error from dataflow job.
```Workflow failed.Causes: Because of the shape of your pipeline, the Cloud Dataflow job optimizer produced a job graph that is not updatable using the - -update pipeline option.This is a known issue that we are working to resolve.See https: // issuetracker.google.com / issues / 118375066 for information about how to modify the shape of your pipeline to avoid this error.You can override this error and force the submission of the job by specifying the --experiments=allow_non_updatable_job parameter., The stateful transform named 'Write Errors to BQ/BigQueryBatchFileLoads/ImpulseSingleElementPC/Map(decode).out/FromValue/ReadStream' is in two or more computations.```


EDITS : 08/11/2020

Added both the flags mentioned as the Pipeline argument.

```INFO:root:Argument to Beam Pipeline:['--project=xxxxxx, '--runner=DataflowRunner', '--job_name=df-pubsub-raw', '--save_main_session', '--staging_location=gs:/staging/', '--temp_location=gs://temp/', '--network=dataflow-localnet', '--subnetwork=regions/us-central1/subnetworks/us-central1', '--region=us-central1', '[email protected]', '--no_use_public_ips', '--streaming', '--experiments=[allow_non_updatable_job, use_beam_bq_sink]']

INFO:root:File load enabled 
INFO:root:Write using file load with frequency:5  

26  File "./dataflow_ps_stream_bq.py", line 133, in stream_to_bq  27    write_disposition=WriteDisposition.WRITE_APPEND  28  File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 141, in __or__  29    return self.pipeline.apply(ptransform, self)  30  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 610, in apply  31    transform.transform, pvalueish, label or transform.label)  32  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 620, in apply  33    return self.apply(transform, pvalueish)  34  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 663, in apply  35    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  36  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 153, in apply  37    return super(DataflowRunner, self).apply(transform, input, options)  38  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 198, in apply  39    return m(transform, input, options)  40  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 228, in apply_PTransform  41    return transform.expand(input)  42  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand  43    'triggering_frequency can only be used with '  44ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery.  ```

Upvotes: 2

Views: 483

Answers (1)

Iñigo
Iñigo

Reputation: 2680

You need to add flag --experiments use_beam_bq_sink. This has been an issue for a while now, Dataflow overrides the load type.

You can see this in the Beam repo.

It also looks there's an on going PR to improve the the inserts, including this (I just skimmed over the code).

PS: Glad our blog post helped coming up with ideas :D

Upvotes: 1

Related Questions