Reputation: 476
I am working on a beam pipeline to process a json and write it to bigquery. The JSON is like this.
{
"message": [{
"name": "abc",
"itemId": "2123",
"itemName": "test"
}, {
"name": "vfg",
"itemId": "56457",
"itemName": "Chicken"
}],
"publishDate": "2017-10-26T04:54:16.207Z"
}
I parse this using Jackson to the below structure.
class Feed{
List<Message> messages;
TimeStamp publishDate;
}
public class Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String key;
private String value;
private Map<String, String> eventItemMap = new HashMap<>();
this property translate the list of map as a single map with all the key-value pair together. because, the messages property will be parsed as list of HashMap objets for each key/value. This will be translated to a single map.
Now in my pipeline, I will convert the collection as
PCollection<KV<String, Feed>>
to write it to different tables based on a property in the class. I have written a transform to do this. The requirement is to create multiple TableRows based on the number of message objects. I have a few more properties in the JSON to along with publishDate which would be added to the tableRow and each message properties. So the table would be as follows.
id, name, field1, field2, message1.property1, message1.property2...
id, name, field1, field2, message2.property1, message2.property2...
I tried to create the below transformation. But, not sure how it will output multiple rows based on the message list.
private class BuildRowListFn extends DoFn<KV<String, Feed>, List<TableRow>> {
@ProcessElement
public void processElement(ProcessContext context) {
Feed feed = context.element().getValue();
List<Message> messages = feed.getMessage();
List<TableRow> rows = new ArrayList<>();
messages.forEach((message) -> {
TableRow row = new TableRow();
row.set("column1", feed.getPublishDate());
row.set("column2", message.getEventItemMap().get("key1"));
row.set("column3", message.getEventItemMap().get("key2"));
rows.add(row);
}
);
}
But, this also will be a List which I won't be able to apply the BigQueryIO.write transformation.
Thanks @jkff. Now, i have changed the code as you mentioned in the second paragraph. context.output(row) inside messages.forEach, after setting table row as
List<Message> messages = feed.getMessage();
messages.forEach((message) -> {
TableRow row = new TableRow();
row.set("column2", message.getEventItemMap().get("key1"));
context.output(row);
}
Now, when i try to write this collection to BigQuery, as
rows.apply(BigQueryIO.writeTableRows().to(getTable(projectId, datasetId, tableName)).withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
I am getting the below exception.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.chefd.gcloud.analytics.pipeline.MyPipeline.main(MyPipeline.java:284)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
Please help.
Thank you.
Upvotes: 3
Views: 6439
Reputation: 17913
It seems that you are assuming that a DoFn
can output only a single value per element. This is not the case: it can output any number of values per element - no values, one value, many values, etc. A DoFn
can even output values to multiple PCollection's.
In your case, you simply need to call c.output(row)
for every row in your @ProcessElement
method, for example: rows.forEach(c::output)
. Of course you'll also need to change the type of your DoFn
to DoFn<KV<String, Feed>, TableRow>
, because the type of elements in its output PCollection
is TableRow
, not List<TableRow>
- you're just producing multiple elements into the collection for every input element, but that doesn't change the type.
An alternative method would be to do what you currently did, also do c.output(rows)
and then apply Flatten.iterables()
to flatten the PCollection<List<TableRow>>
into a PCollection<TableRow>
(you might need to replace List
with Iterable
to get it to work). But the other method is easier.
Upvotes: 5