shadow chris
shadow chris

Reputation: 391

Dataflow DoFn will not serialize with PipelineOptions

I'm trying to pass a PipelineOptions interface to a dataflow DoFn so that the DoFn can configure some unserializable things that it needs to reinstantiate, but it seems like Dataflow is unable to serialize the DoFn when I tell it to hold an instance of my PipelineOptions subclass. Is there something that I need to do to the Options interface to make it serialize correctly?

I know it is an option to write custom serialization + deserialization code (like in https://gist.github.com/jlewi/f1cd323dc88bd58601ef, How to fix Dataflow unable to serialize my DoFn?), but it seems like the PipelineOptions class explicitly says that it should be serializable, and I would prefer to not to write serialization and deserialization code in every DoFn that I use this options object for.

Options class snippet:

public interface Options 
extends BigtableOptions, BigtableScanOptions, OfflineModuleOptions, Serializable {...}

DoFn definition

public class RunEventGeneratorsDoFn extends DoFn<...,...> {
    private OfflinePipelineRunner.Options options;
....
}

Serialization exception when Options is not marked transient

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [my DoFn]
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:720)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:678)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:596)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:563)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:558)
    at [dofn instantiation line]
Caused by: java.io.NotSerializableException: com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
    ... 7 more

Upvotes: 1

Views: 4803

Answers (1)

Ben Chambers
Ben Chambers

Reputation: 6130

The actual pipeline options object should not be included as a field in a specific DoFn or PTransform. Instead, pass in the value of the specific options you want to access.

See this question for some more context "How to get PipelineOptions in composite PTransform in Beam 2.0?".

Upvotes: 3

Related Questions