Graham Polley
Graham Polley

Reputation: 14791

Why is writing to BigQuery from a Dataflow/Beam pipeline slow?

We have a very simple pipeline that is reading from GCS, performing a simple ParDo, and then writing the results to BigQuery. It's autoscaling up to 50 VMs, running on GCP, and not doing anything fancy.

Reading all the data from GCS (~10B records & ~700+GB), and transforming it, all happens relatively quickly (in the first 7-10 minutes).

But, when it gets to the BigQuery write (using BigQueryIO), it slows right down - even though it only has to write about 1M records (~60MB). This step alone is taking ~20m.

In addition to the slow write to BigQuery, the graph shows that step as being "stopped" even though it was successful (albeit extremely slow). The step also looks overly complicated for just a simple write to BigQuery (see picture below).

The bottleneck appears to be when it reaches the step Executing operation BigQueryIO.Write/BatchLoads/WriteRename (see logs below).

Is there something I'm doing wrong in my code?

Code:

public class Pipeline {
    private static final String BIG_QUERY_TABLE = "<redacted>:<redacted>.melbourne_titles";
    private static final String BUCKET = "gs://<redacted>/*.gz";

    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(DataflowPipelineOptions.class);
        options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(TextIO.read().from(BUCKET).withCompressionType(GZIP))
                .apply(ParDo.of(new DoFn<String, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        String input = c.element();
                        String title = input.split(",")[5];
                        if (title.toLowerCase().contains("melbourne")) {
                            TableRow tableRow = new TableRow();
                            tableRow.set("title", title);
                            c.output(tableRow);
                        }
                    }
                }))
                .apply(BigQueryIO.writeTableRows()
                        .to(BIG_QUERY_TABLE)
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_TRUNCATE)
                        .withSchema(getSchema()));
        pipeline.run();
    }

    private static TableSchema getSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("title").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);
        return schema;
    }
}

Log snippet:

  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Create
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:30:23) Starting 50 workers in australia-southeast1-a...
  2017-08-25 (21:30:23) Executing operation TextIO.Read/Read+ParDo(Anonymous)+BigQueryIO.Write/PrepareWrite/ParDo(Anonymous)...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TriggerIdCreation/Read(CreateSource)+BigQueryIO.Writ...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoad...
  2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:31:22) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:31:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:38:10) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/CreateDataflowView
  2017-08-25 (21:38:13) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/CreateDataflowView
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Close
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Read+BigQueryIO.Write/BatchLoads/GroupByK...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Create
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Create
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Close
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Close
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Read+BigQueryIO...
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Read+BigQueryIO...
  2017-08-25 (21:39:00) Executing operation s35-u80
  2017-08-25 (21:39:01) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/Flatten.PCollections
  2017-08-25 (21:39:03) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/CreateDataflowView
  2017-08-25 (21:39:06) Executing operation BigQueryIO.Write/BatchLoads/ResultsView/CreateDataflowView
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Create
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Create
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/Create.Values/Read(CreateSource)+BigQueryIO.Write/Ba...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Close
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Close
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read+BigQueryIO...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Read+BigQueryIO....
  2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/CreateDataflowView
  2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/WriteRename
  2017-08-25 (21:57:43) Stopping worker pool...

Overly complex looking step:

enter image description here

Job Details:

UPDATE

I think the problem is the sheer number of files that Dataflow is producing, and subsequently, that BigQuery has to load. It may only be 1M rows, but Dataflow is producing 850+ files to load:

  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_IF_NEEDED",
      "destinationTable" : {
        "datasetId" : "dataflow_on_a_tram",
        "projectId" : "grey-sort-challenge",
        "tableId" : "melbourne_titles"
      },
      "schema" : {
        "fields" : [ {
          "name" : "year",
          "type" : "STRING"
        }, {
          "name" : "month",
          "type" : "STRING"
        }, {
          "name" : "day",
          "type" : "STRING"
        }, {
          "name" : "wikimedia_project",
          "type" : "STRING"
        }, {
          "name" : "language",
          "type" : "STRING"
        }, {
          "name" : "title",
          "type" : "STRING"
        }, {
          "name" : "views",
          "type" : "INTEGER"
        } ]
      },
      "sourceFormat" : "NEWLINE_DELIMITED_JSON",
      "sourceUris" : [
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/51221a43-8fd8-417d-90ca-2f3c3e5789d2",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/5e1c3cb8-20d1-45ef-b0bb-209645c36093",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0ed8d240-2bc2-4c8b-808d-792540448c73",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/d7a1fefe-6dd8-4f30-bf97-040c3692e448",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/b7c4d9a8-d45d-4cc6-b375-291e6435ed53",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/17a7bbf4-5695-4188-b03a-3ef5cda8607c",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/783af461-c114-4a41-aa5f-ed1c7db86bab",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/dad046fc-eabf-4212-83f1-7d7fa71075c1",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/7b9ffec1-7424-4248-83b4-98a4ef4233b9",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/bb297232-8e84-4a14-9dc6-3efde1b2b586",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0693972a-1319-4637-af9f-8a4a3d5cb0f7",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/41b1e722-f76c-404d-a71b-bd36c09e8a06",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/19cfd89e-c9ee-4221-aee1-b3503dbcd93b",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/574467f2-5771-479a-b213-2941225a24bd",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/4d872304-0f42-47f2-89cf-b3a3f856ca67",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/1c086246-8eec-4bbe-be98-b01abb181d33",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/9439f5f4-5020-471d-b631-e1a3fea1584f",

[..]851 files!

Upvotes: 2

Views: 1747

Answers (1)

Reuven Lax
Reuven Lax

Reputation: 781

One thing to keep in mind, BigQuery does not guarantee latency of load jobs. If many other load jobs are issued at the same time, your job might wait in a queue waiting to be scheduled. If you can run this job again, we should be able to help you inspect the BigQuery load job itself to see what is happening.

Upvotes: 1

Related Questions