Reputation: 702
I'm facing a weird problem while using Dataflow Streaming Insert. I have a JSON with a lot of records and arrays. I set up the Pipeline with Streaming Insert method and a class DeadLetters to handle the errors.
formattedWaiting.apply("Insert Bigquery ",
BigQueryIO.<KV<TableRow,String>>write()
.to(customOptions.getOutputTable())
.withFormatFunction(kv -> kv.getKey())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchemaFromView(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withoutValidation()
.withExtendedErrorInfo()
.withTimePartitioning(new TimePartitioning().setField(customOptions.getPartitionField().get()))
.withClustering(clusteringFieldsList)
.withExtendedErrorInfo())
.getFailedInsertsWithErr()
.apply("Taking 1 element insertion", Sample.<BigQueryInsertError>any(1))
.apply("Insertion errors",ParDo.of(new DeadLettersHandler()));
The problem is when I'm using the streaming insert method, some rows don't insert into the table and I'm receiving the error:
Repeated record with name: XXXX added outside of an array.
I double-checked the JSON that has the problem and everything seems fine. The weird part is when I comment the withMethod line, the row insert with no issue at all.
I don't know why the pipeline has that behavior.
The JSON looks like this.
{
"parameters":{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
},
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"2",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
}
The BigQuery schema is fine because I can insert data while commenting the streaming insert line in the BigQueryIO
Any idea fellows?
Thanks in advance!
Upvotes: 2
Views: 2601
Reputation: 702
Just an update to this question.
The problem was with the schema declaration and the JSON itself.
We defined the parameters
column as RECORD REPEATED
but parameters
is an object in the JSON example.
So we have two options here.
RECORD REPEATED
to RECORD NULLABLE
parameters
object, for this option you will have to transform the JSON and add the brackets to treat the object as an array.Example:
{
"parameters":[
{
"parameter":[
{
"subParameter":[
{
"value":"T",
"key":"C"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
},
{
"value":"1",
"key":"SEQUENCE_NUMBER"
}
],
"value":"C",
"key":"C"
}
]
}
]
}
Upvotes: 3