Reputation: 1464
I'm having an odd issue running my Dataflow pipeline. I have written my own Coder, but switching that out with AvroCoder, SerializableCoder and other examples have produced the same issue.
Here is the exception I am getting, after trying to launch the pipeline using Dataflow Service in streaming mode:
Exception in thread "main" java.lang.RuntimeException: Unable to deserialize Coder: ModelCoder. Check that a suitable constructor is defined. See Coder for details.
at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:113)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureCoderSerializable(DirectPipelineRunner.java:901)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensurePCollectionEncodable(DirectPipelineRunner.java:861)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.setPCollectionValuesWithMetadata(DirectPipelineRunner.java:789)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.setPCollection(DirectPipelineRunner.java:776)
at com.google.cloud.dataflow.sdk.io.TextIO.evaluateReadHelper(TextIO.java:786)
at com.google.cloud.dataflow.sdk.io.TextIO.access$000(TextIO.java:118)
at com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound$1.evaluate(TextIO.java:327)
at com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound$1.evaluate(TextIO.java:323)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:706)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
at io.momentum.demo.models.pipeline.PlatformPipeline.main(PlatformPipeline.java:96)
Caused by: java.lang.IllegalStateException: Sub-class com.google.cloud.dataflow.sdk.util.CoderUtils$Jackson2Module$Resolver MUST implement `typeFromId(DatabindContext,String)
at com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase.typeFromId(TypeIdResolverBase.java:77)
at com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._findDeserializer(TypeDeserializerBase.java:156)
at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:106)
at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:91)
at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:142)
at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:42)
at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3760)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2042)
at com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2529)
at com.google.cloud.dataflow.sdk.util.Serializer.deserialize(Serializer.java:98)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:110)
... 18 more
My implementation Coder
just wraps AvroCoder
and hooks into some of our own code:
public final class ModelCoder<M extends AppModel> extends AtomicCoder<M> {
public static <T extends AppModel> ModelCoder<T> of(Class<T> clazz) {
return new ModelCoder<>(clazz);
}
@JsonCreator
@SuppressWarnings("unchecked")
public static ModelCoder<?> of(@JsonProperty("kind") String classType) throws ClassNotFoundException {
Class<?> clazz = Class.forName(classType);
return of((Class<? extends AppModel>) clazz);
}
private String kind;
public ModelCoder(Class<M> type) {
this.kind = type.getSimpleName();
}
@Override
public void encode(M value, OutputStream outStream, Context context) throws IOException, CoderException {
CoderInternals.encode(value, outStream, context, new TypeReference<TypedSerializedModel<M>>() { });
}
@Override
public M decode(InputStream inStream, Context context) throws IOException, CoderException {
return CoderInternals.decode(inStream, context, new TypeReference<TypedSerializedModel<M>>() { });
}
@Override
public CloudObject asCloudObject() {
CloudObject co = super.asCloudObject();
co.set("kind", kind);
return co;
}
}
The coder works as expected when it is invoked to encode(..)
or decode(..)
an AppModel
, but this exception occurs anyway.
Upvotes: 1
Views: 585
Reputation: 3010
You need a static method tagged with @JsonCreator so that the service can instantiate your coder on the workers. You also shouldn't overwrite asCloudObject(); that determines how your Coder will be serialized and sent to the workers, and your code will just send a serialized AvroCoder.
For example, take a look at NullableCoder.java (https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java) for an example of a Coder that wraps another one.
Upvotes: 4