Reputation: 11
I am trying to use DynamicDestinations to write to a partitioned table in BigQuery where the partition name is mytable$yyyyMMdd. If I bypass dynamicdestinations and supply a hardcoded table name in .to()
, it works; however, with dynamicdestinations I get the following exception:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1@6fff253c
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.expand(PrepareWrite.java:51)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.expand(PrepareWrite.java:36)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:297)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:987)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:972)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:659)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
at com.homedepot.payments.monitoring.eventprocessor.MetricsAggregator.main(MetricsAggregator.java:82)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableReference
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
And here is the code:
PCollection<Event> rawEvents = pipeline
.apply("ReadFromPubSub",
PubsubIO.readProtos(EventOuterClass.Event.class)
.fromSubscription(OPTIONS.getSubscription())
)
.apply("Parse", ParDo.of(new ParseFn()))
.apply("ExtractAttributes", ParDo.of(new ExtractAttributesFn()));
EventTable table = new EventTable(OPTIONS.getProjectId(), OPTIONS.getMetricsDatasetId(), OPTIONS.getRawEventsTable());
rawEvents.apply(BigQueryIO.<Event>write()
.to(new DynamicDestinations<Event, String>() {
private static final long serialVersionUID = 1L;
@Override
public TableSchema getSchema(String destination) {
return table.schema();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(table.reference(), null);
}
@Override
public String getDestination(ValueInSingleWindow<Event> element) {
String dayString = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).toString();
return table.reference().getTableId() + "$" + dayString;
}
})
.withFormatFunction(new SerializableFunction<Event, TableRow>() {
public TableRow apply(Event event) {
TableRow row = new TableRow();
Event evnt = (Event) event;
row.set(EventTable.Field.VERSION.getName(), evnt.getVersion());
row.set(EventTable.Field.TIMESTAMP.getName(), evnt.getTimestamp() / 1000);
row.set(EventTable.Field.EVENT_TYPE_ID.getName(), evnt.getEventTypeId());
row.set(EventTable.Field.EVENT_ID.getName(), evnt.getId());
row.set(EventTable.Field.LOCATION.getName(), evnt.getLocation());
row.set(EventTable.Field.SERVICE.getName(), evnt.getService());
row.set(EventTable.Field.HOST.getName(), evnt.getHost());
row.set(EventTable.Field.BODY.getName(), evnt.getBody());
return row;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
Any pointers in the correct direction would be greatly appreciated. Thanks!
Upvotes: 1
Views: 2645
Reputation: 732
Looks like you're trying to fill a specific partition based on the event. Why not use:
SerializableFunction<ValueInSingleWindow<Event>, TableDestination>
?
Upvotes: 1
Reputation: 7493
From inspecting the exception message and the code above, it seems that the EventTable
field used within your anonymous DynamicDestinations
class contains a TableReference
field which is not serializable.
One workaround would be to convert the anonymous DynamicDestinations
to a static inner class and define a constructor which stores only the serializable pieces of the EventTable
needed to implement the interface.
For example:
private static class EventDestinations extends DynamicDestinations<Event, String> {
private final TableSchema schema;
private final TableDestination destination;
private final String tableId;
private EventDestinations(EventTable table) {
this.schema = table.schema();
this.destination = new TableDestination(table.reference(), null);
this.tableId = table.reference().getTableId();
}
// ..
}
Upvotes: 3