Reputation: 6686
I am trying to use the new Bigquery Storage API to do streaming inserts from Golang. I understand based on this page that this API replaces the old streaming insert bigquery API.
However, none of the examples in the docs show how to actually insert rows. In order to create an AppendRowsRequest, I have arrived at the following:
&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: nil, // protobuf schema??
Rows: &storagepb.ProtoRows{
SerializedRows: [][]byte{}, // serialized protocol buffer data??
},
},
},
}
What data should I put into the SerializedRows field above?
The storagepb.ProtoRows
struct above is documented here. Unfortunately all that is given is a link to the main overview page for protocol buffers.
Can anyone give me an example of using the new Bigquery Storage API to stream rows into bigquery from Golang?
Upvotes: 5
Views: 5572
Reputation: 6686
With much help from the answers above I have come to a working example, which is available on github: https://github.com/alexflint/bigquery-storage-api-example
The main code is:
const (
project = "myproject"
dataset = "mydataset"
table = "mytable"
trace = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104},
{Name: "Jane Doe", Age: 69},
{Name: "Adam Smith", Age: 33},
}
func main() {
ctx := context.Background()
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
TraceId: trace, // identifies this client
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
And the protocol buffer definition for the "Row" struct above is:
syntax = "proto3";
package tutorial;
option go_package = ".;main";
message Row {
string Name = 1;
int32 Age = 2;
}
You need to create a bigquery dataset and table first with a schema that corresponds to the protocol buffer. See the readme in the repository linked above for how to do that.
After running the code above, the data shows up in bigquery like this:
$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE
+------------+-----+
| name | age |
+------------+-----+
| John Doe | 104 |
| Jane Doe | 69 |
| Adam Smith | 33 |
+------------+-----+
Thanks all for the help!
Upvotes: 6
Reputation: 150
I found out some documentation [1][2] about writing streams to a table but I’m not really sure that this is what you’re looking for. Keep in mind that storage/apiv1beta2 is currently in beta state, so maybe this is not yet implemented or lacks documentation about it. If the documentation that I attached doesn’t help you we could open a public issue tracker to correctly document or implement the row streaming.
Upvotes: 1