Reputation: 145
I have data with repeated key-value (String,String) record pairs as one of the fields in a Big Query table schema.
I am trying to add these repeated records using the approach here: http://sookocheff.com/post/bigquery/creating-a-big-query-table-java-api/
The table schema created for the repeated record field looks like this:
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
I am inserting data like this as part of a DoFn:
Map<String,String> record = ... // key-value pairs
List<TableRow> rawFields = new ArrayList<>();
record.forEach((k,v)->
rawFields.add(new TableRow().set("key",k).set("value", v))
);
TableRow row = new TableRow();
// row has other fields, omitted here
row.set("rawFields", rawFields);
The DoFn is in my dataflow pipeline just before the BigQueryIO.Write:
.apply(BigQueryIO.Write
.named("WriteLBLogLines")
.to("xxx:yyy.zzz")
.withSchema(mySchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
When I try and run this through Dataflow I get the following error:
errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0 at file: gs://xxxxxxxxxx/12271006010671464167/dax-tmp-2016-06-28_14_47_26-12271006010671462904-S04-1-303c4f638f6b411b/-shard-00002-of-00003-try-021aff4c448b3177-endshard.json. Repeated field must be imported as a JSON array. Field: rawFields.
What is wrong with my approach? It seems I am not using the right approach for inserting repeated records.
Upvotes: 3
Views: 5292
Reputation: 3214
I've attempted to reproduce the problem with the following code, but it executes successfully. Are there other aspects of the schema that could be at issue?
List<TableFieldSchema> fields = new ArrayList<>();
TableFieldSchema column = new TableFieldSchema().setName("rawFields");
column.setType("RECORD");
List<TableFieldSchema> list = new ArrayList<>();
list.add(new TableFieldSchema().setName("key").setType("STRING"));
list.add(new TableFieldSchema().setName("value").setType("STRING"));
column.setFields(list);
column.setMode("REPEATED");
fields.add(column);
TableSchema schema = new TableSchema().setFields(fields);
TableRow row = new TableRow();
List<TableRow> rawFields = new ArrayList<>();
rawFields.add(new TableRow().set("key","foo").set("value", "bar"));
row.set("rawFields", rawFields);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> c =
p.apply(Create.of(row, row).withCoder(TableRowJsonCoder.of()));
c.apply(BigQueryIO.Write.named("BigQuery-Write")
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withSchema(schema));
p.run();
Upvotes: 4