sebastian-montero
sebastian-montero

Reputation: 389

I get 504 Deadline Exceeded in Apache Beam using ReadFromSpanner

I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the ReadFromSpanner method in apache_beam.io.gcp.experimental.spannerio. This works for most of my Spanner tables but the really large ones that are >16m rows tend to fail due to the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
    for row in read_action(element['partitions']):
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in __iter__
    self._consume_next()
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
    response = six.next(self._response_iterator)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
    for item in iterator:
  File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']

From my understanding this error comes from the ReadFromSpanner operation as it's workers have timed out.

To solve this I have tried the following:

My latest code is attached below. I am including Transformation for simple data wrangling in the rows.

 """Set pipeline arguments."""
    options = PipelineOptions(
        region=RUNNER_REGION,
        project=RUNNER_PROJECT_ID,
        runner=RUNNER,
        temp_location=TEMP_LOCATION,
        job_name=JOB_NAME,
        service_account_email=SA_EMAIL,
        setup_file=SETUP_FILE_PATH,
        disk_size_gb=500,
        num_workers=10,
        machine_type="n1-standard-2",
        save_main_session=True)

    """Build and run the pipeline."""
        with beam.Pipeline(options=options) as p:
            (p
             | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
             | "Transform elements into dictionary" >> beam.ParDo(Transformation)
             | "Write new records to BQ" >> WriteToBigQuery(
                 BIGQUERY_TABLE,
                 schema=SCHEMA,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                 ) 

A potential solution is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?

Upvotes: 1

Views: 800

Answers (1)

sebastian-montero
sebastian-montero

Reputation: 389

I submitted this issue in the googleapis/python-spanner repo. The maintainers of the library were able to help me out and include retry and timeout options for reads and queries.

To solve the problem I reverse engineered the Apache Beam Spanner connector, apache_beam.io.gcp.experimental.spannerio. Specifically the _ReadFromPartitionFn to include the timeout option.

I included the following code that will run create read partition objects from Spanner and then read from those partition objects. Note that I am using the timeout variable in process_query_batch within readSpannerPartitions.

class createSpannerReadPartitions(beam.DoFn):
    def __init__(self, SPANNER_CONFIG):
        self.project = SPANNER_CONFIG['spanner_project']
        self.instance = SPANNER_CONFIG['spanner_instance']
        self.db = SPANNER_CONFIG['spanner_database']
        self.query = SPANNER_CONFIG['query']

    def setup(self):
        spanner_client = spanner.Client(self.project)
        spanner_instance = spanner_client.instance(self.instance)
        spanner_db = spanner_instance.database(self.db)
        self.snapshot = spanner_db.batch_snapshot()
        self.snapshot_dict = self.snapshot.to_dict()

    def process(self, element):
        partitioning_action = self.snapshot.generate_query_batches

        for p in partitioning_action(self.query):
            yield {
                "partitions": p,
                "transaction_info": self.snapshot_dict}


class readSpannerPartitions(beam.DoFn):
    def __init__(self, SPANNER_CONFIG):
        self.project = SPANNER_CONFIG['spanner_project']
        self.instance = SPANNER_CONFIG['spanner_instance']
        self.db = SPANNER_CONFIG['spanner_database']
        self.query = SPANNER_CONFIG['query']

    def setup(self):
        spanner_client = spanner.Client(self.project)
        spanner_instance = spanner_client.instance(self.instance)
        self.spanner_db = spanner_instance.database(self.db)
        self.snapshot = self.spanner_db.batch_snapshot()
        self.snapshot_dict = self.snapshot.to_dict()

    def process(self, element):
        self.snapshot = BatchSnapshot.from_dict(
            self.spanner_db, element['transaction_info'])

        read_action = self.snapshot.process_query_batch
        for row in read_action(element['partitions'], timeout=86400):
            yield row

    def teardown(self):
        self.snapshot.close()

I then created the pipeline like so

with beam.Pipeline(options=options) as p:
    p_read = (p | beam.Create(["Start pipeline"])
                | 'Generate Partitions' >> beam.ParDo(createSpannerReadPartitions(SPANNER_CONFIG))
                | 'Reshuffle' >> beam.Reshuffle()
                | 'Read From Partitions' >> beam.ParDo(readSpannerPartitions(SPANNER_CONFIG)))
    
    return p_read

This has been possible thanks to maintainers of the googleapis/python-spanner repo.

Upvotes: 1

Related Questions