KM_
KM_

Reputation: 41

Dataflow pipeline stuck at reading from Pub/Sub

After one day of working completely fine, streaming data from Pub/Sub, flattening the data and writing the rows to BigQuery; the Dataflow Pipeline has started reporting errors like this:


Processing stuck in step s01 for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:170)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:191)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
  at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1269)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

These errors incremenet the time and reach up to 25m00s with the same error trace.

Through Stackdriver I didn't have any luck because those Errors are not displayed.

Here is my pipeline:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window


class parse_pubsub(beam.DoFn):
    def process(self, element):
        # Flatten data ...
        for row in final_rows:
            yield row


def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_topic', required=True,
        help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
    parser.add_argument(
        '--output_table', required=True,
        help=('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    # table_schema = '-------'

    with beam.Pipeline(argv=pipeline_args) as p:
        lines = ( p | 'Read from PubSub' >> beam.io.ReadFromPubSub(known_args.input_topic)
                    | 'Parse data' >> beam.ParDo(parse_pubsub())
                    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                        known_args.output_table,
                        schema=table_schema,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                    )
                )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Can this be a worker issue? Should I start the job with more workers? Is something that can be prevented in the code?

Upvotes: 1

Views: 1427

Answers (1)

Andrew Pilloud
Andrew Pilloud

Reputation: 478

Unfortunately Python Streaming Dataflow jobs are still in Beta. One of the limitations of the Beta is that several of the IO connectors are running on the Dataflow backend and the logs are not accessible to users.

There is at least one issue that I've seen similar stack traces for, BEAM-5791, which was fixed in 2.9.0. If you aren't already, try upgrading to the latest version of Beam.

Another common cause is permission problems. Make sure the Dataflow service account still has access to your pubsub topic.

If you are still having trouble after that, you'll need to file a ticket with Google cloud support. They can look at the backend logs for your job and help you find the cause of the problem.

Upvotes: 2

Related Questions