Reputation: 33
I am trying use existing google sample code (PubSubToBigQuery.java)
to parse StackDriver
log messages and push them into BigQuery
.
The problem is one of the SD log field's name is "@type"
that is not acceptable by BigQuery. So I created that table in BigQuery
with different field name (mytest).
Now when I run PubSubToBigQuery.java
obviously I get the error message that
"@type" field not found.
How can I rename the column name from "@type"
to "mytype"
inside my Beam code?
Upvotes: 1
Views: 1011
Reputation: 183
If you want just to put Stackdriver logs unchanged into BigQuery you can use the built in export feature of Stackdriver and create a sink to BigQuery: https://cloud.google.com/logging/docs/export/
If somehow the export is not feasible for you, you can modify the the transformation logic in Beam.
In this case PubSubToBigQuery.java the BigQueryIO uses a TableRow PCollection as input to write the messages into BigQuery. The PubsubMessageToTableRow PTransform does the transformation from PubsubMessage into TableRow with some error handling. You can add a ParDo with a custom DoFn which changes the column name in the created TableRow. The process element method could look something like this:
@ProcessElement
public void processElement(@Element TableRow row, OutputReceiver<TableRow> outputReceiver) {
TableRow clone = row.clone();
Object value = clone.get("@type");
clone.remove("@type");
clone.set("mytype", value);
outputReceiver.output(clone);
}
If you use the unchanged PubSubToBigQuery.java I linked you can apply this ParDo on the jsonToTableRowOut.get(TRANSFORM_OUT)
PCollection somewhere around line 323 in the code.
Upvotes: 1