DilTeam
DilTeam

Reputation: 2661

Inserting rows on BigQuery: InsertAllRequest Vs BigQueryIO.writeTableRows()

When I'm inserting rows on BigQuery using writeTableRows, performance is really bad compared to InsertAllRequest. Clearly, something is not setup correctly.

Use case 1: I wrote a Java program to process 'sample' Twitter stream using Twitter4j. When a tweet comes in I write it to BigQuery using this:

insertAllRequestBuilder.addRow(rowContent);

When I run this program from my Mac, it inserts about 1000 rows per minute directly into BigQuery table. I thought I could do better by running a Dataflow job on the cluster.

Use case 2: When a tweet comes in, I write it to a topic of Google's PubSub. I run this from my Mac which sends about 1000 messages every minute.

I wrote a Dataflow job that reads this topic and writes to BigQuery using BigQueryIO.writeTableRows(). I have a 8 machine Dataproc cluster. I started this job on the master node of this cluster with DataflowRunner. It's unbelievably slow! Like 100 rows every 5 minutes or so. Here's a snippet of the relevant code:

statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = new TableRow();
        Status status = c.element();
        row.set("Id", status.getId());
        row.set("Text", status.getText());
        row.set("RetweetCount", status.getRetweetCount());
        row.set("FavoriteCount", status.getFavoriteCount());
        row.set("Language", status.getLang());
        row.set("ReceivedAt", null);
        row.set("UserId", status.getUser().getId());
        row.set("CountryCode", status.getPlace().getCountryCode());
        row.set("Country", status.getPlace().getCountry());
        c.output(row);
    }
})) 
    .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
            .withSchema(schema)
            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

What am I doing wrong? Should I use a 'SparkRunner'? How do I confirm that it's running on all nodes of my cluster?

Upvotes: 2

Views: 835

Answers (1)

Felipe Hoffa
Felipe Hoffa

Reputation: 59175

With BigQuery you can either:

  • Stream data in. Low latency, up to 100k rows per second, has a cost.
  • Batch data in. Way higher latency, incredible throughput, totally free.

That's the difference you are experiencing. If you only want to ingest 1000 rows, batching will be noticeably slower. The same with 10 billion rows will be way faster thru batching, and at no cost.

Dataflow/Bem's BigQueryIO.writeTableRows can either stream or batch data in.

With BigQueryIO.Write.Method.FILE_LOADS the pasted code is choosing batch.

Upvotes: 2

Related Questions