Alex Flint
Alex Flint

Reputation: 6686

Using BigQuery Write API in Golang

I am trying to use the new Bigquery Storage API to do streaming inserts from Golang. I understand based on this page that this API replaces the old streaming insert bigquery API.

However, none of the examples in the docs show how to actually insert rows. In order to create an AppendRowsRequest, I have arrived at the following:

&storagepb.AppendRowsRequest{
    WriteStream: resp.Name,
    Rows: &storagepb.AppendRowsRequest_ProtoRows{
        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
            WriterSchema: nil, // protobuf schema??
            Rows: &storagepb.ProtoRows{
                SerializedRows: [][]byte{}, // serialized protocol buffer data??
            },
        },
    },
}

What data should I put into the SerializedRows field above?

The storagepb.ProtoRows struct above is documented here. Unfortunately all that is given is a link to the main overview page for protocol buffers.

Can anyone give me an example of using the new Bigquery Storage API to stream rows into bigquery from Golang?

Upvotes: 5

Views: 5572

Answers (2)

Alex Flint
Alex Flint

Reputation: 6686

With much help from the answers above I have come to a working example, which is available on github: https://github.com/alexflint/bigquery-storage-api-example

The main code is:

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
    {Name: "John Doe", Age: 104},
    {Name: "Jane Doe", Age: 69},
    {Name: "Adam Smith", Age: 33},
}

func main() {
    ctx := context.Background()

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        TraceId:     trace, // identifies this client
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}

And the protocol buffer definition for the "Row" struct above is:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
    string Name = 1;
    int32 Age = 2;
}

You need to create a bigquery dataset and table first with a schema that corresponds to the protocol buffer. See the readme in the repository linked above for how to do that.

After running the code above, the data shows up in bigquery like this:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

Thanks all for the help!

Upvotes: 6

Carlos CB
Carlos CB

Reputation: 150

I found out some documentation [1][2] about writing streams to a table but I’m not really sure that this is what you’re looking for. Keep in mind that storage/apiv1beta2 is currently in beta state, so maybe this is not yet implemented or lacks documentation about it. If the documentation that I attached doesn’t help you we could open a public issue tracker to correctly document or implement the row streaming.

Upvotes: 1

Related Questions