Reputation: 391
I'm trying to pass a PipelineOptions interface to a dataflow DoFn so that the DoFn can configure some unserializable things that it needs to reinstantiate, but it seems like Dataflow is unable to serialize the DoFn when I tell it to hold an instance of my PipelineOptions subclass. Is there something that I need to do to the Options interface to make it serialize correctly?
I know it is an option to write custom serialization + deserialization code (like in https://gist.github.com/jlewi/f1cd323dc88bd58601ef, How to fix Dataflow unable to serialize my DoFn?), but it seems like the PipelineOptions class explicitly says that it should be serializable, and I would prefer to not to write serialization and deserialization code in every DoFn that I use this options object for.
Options class snippet:
public interface Options
extends BigtableOptions, BigtableScanOptions, OfflineModuleOptions, Serializable {...}
DoFn definition
public class RunEventGeneratorsDoFn extends DoFn<...,...> {
private OfflinePipelineRunner.Options options;
....
}
Serialization exception when Options is not marked transient
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [my DoFn]
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:720)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:678)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:596)
at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:563)
at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:558)
at [dofn instantiation line]
Caused by: java.io.NotSerializableException: com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
... 7 more
Upvotes: 1
Views: 4803
Reputation: 6130
The actual pipeline options object should not be included as a field in a specific DoFn
or PTransform
. Instead, pass in the value of the specific options you want to access.
See this question for some more context "How to get PipelineOptions in composite PTransform in Beam 2.0?".
Upvotes: 3