Andrew Turner
Andrew Turner

Reputation: 11

Dataflow DynamicDestinations unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite

I am trying to use DynamicDestinations to write to a partitioned table in BigQuery where the partition name is mytable$yyyyMMdd. If I bypass dynamicdestinations and supply a hardcoded table name in .to(), it works; however, with dynamicdestinations I get the following exception:

java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1@6fff253c
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.expand(PrepareWrite.java:51)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.expand(PrepareWrite.java:36)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:297)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:987)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:972)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:659)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
at com.homedepot.payments.monitoring.eventprocessor.MetricsAggregator.main(MetricsAggregator.java:82)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableReference
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

And here is the code:

PCollection<Event> rawEvents = pipeline
    .apply("ReadFromPubSub",
        PubsubIO.readProtos(EventOuterClass.Event.class)
                .fromSubscription(OPTIONS.getSubscription())
    )
    .apply("Parse", ParDo.of(new ParseFn()))
    .apply("ExtractAttributes", ParDo.of(new ExtractAttributesFn()));


EventTable table = new EventTable(OPTIONS.getProjectId(), OPTIONS.getMetricsDatasetId(), OPTIONS.getRawEventsTable());
rawEvents.apply(BigQueryIO.<Event>write()
    .to(new DynamicDestinations<Event, String>() {

        private static final long serialVersionUID = 1L;

        @Override
        public TableSchema getSchema(String destination) {
            return table.schema();
        }

        @Override
        public TableDestination getTable(String destination) {
            return new TableDestination(table.reference(), null);
        }

        @Override
        public String getDestination(ValueInSingleWindow<Event> element) {
            String dayString = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).toString();
            return table.reference().getTableId() + "$" + dayString;
        }
    })
    .withFormatFunction(new SerializableFunction<Event, TableRow>() {
        public TableRow apply(Event event) {
            TableRow row = new TableRow();
            Event evnt = (Event) event;
            row.set(EventTable.Field.VERSION.getName(), evnt.getVersion());
            row.set(EventTable.Field.TIMESTAMP.getName(), evnt.getTimestamp() / 1000);
            row.set(EventTable.Field.EVENT_TYPE_ID.getName(), evnt.getEventTypeId());
            row.set(EventTable.Field.EVENT_ID.getName(), evnt.getId());
            row.set(EventTable.Field.LOCATION.getName(), evnt.getLocation());
            row.set(EventTable.Field.SERVICE.getName(), evnt.getService());
            row.set(EventTable.Field.HOST.getName(), evnt.getHost());
            row.set(EventTable.Field.BODY.getName(), evnt.getBody());
            return row;
        }
    })
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);

Any pointers in the correct direction would be greatly appreciated. Thanks!

Upvotes: 1

Views: 2645

Answers (2)

Alan Cabrera
Alan Cabrera

Reputation: 732

Looks like you're trying to fill a specific partition based on the event. Why not use:

SerializableFunction<ValueInSingleWindow<Event>, TableDestination>?

Upvotes: 1

Scott Wegner
Scott Wegner

Reputation: 7493

From inspecting the exception message and the code above, it seems that the EventTable field used within your anonymous DynamicDestinations class contains a TableReference field which is not serializable.

One workaround would be to convert the anonymous DynamicDestinations to a static inner class and define a constructor which stores only the serializable pieces of the EventTable needed to implement the interface.

For example:

private static class EventDestinations extends DynamicDestinations<Event, String> {
  private final TableSchema schema;
  private final TableDestination destination;
  private final String tableId;

  private EventDestinations(EventTable table) {
    this.schema = table.schema();
    this.destination = new TableDestination(table.reference(), null);
    this.tableId = table.reference().getTableId();
  }

  // ..
}

Upvotes: 3

Related Questions