Reputation: 4472
I have a simple pipeline in dataflow 2.1 sdk. Which reads data from pubsub then applies a DoFn to it.
PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));
Getting below error on this pipeline:
java.lang.IllegalStateException: Unable to return a default Coder for ToEvents/ParMultiDo(MyDoFNClass).out0 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.X.X.model.MyClass.
MyDoFn class is below:
@DefaultCoder(AvroCoder.class)
public class MyClass{
public long id;
public HashMap<String,HashSet<String>> a;
@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}
Upvotes: 10
Views: 11698
Reputation: 1366
Below are some docs about coder from beam programming guide
The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.
Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.
Go to the below link to see default coders used by beam libraries - https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry
If your object that you are using in pCollections does not lies within the default coder, you may have to provide a custom coder for that object. for e.g. If you look at the implemenation of PubsubIO.write()/PubsubIO.read() methods, they use a custom coder. e.g. PubsubMessagePayloadOnlyCoder
Suppose you are converting a string into Pubsub Message. You can supply this coder to your pcollection.
PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());
Upvotes: 1
Reputation: 4472
Found the solution just neeeded to add implements Serializable
to MyClass
@DefaultCoder(AvroCoder.class)
public class MyClass implements Serializable {
public long id;
public HashMap<String,HashSet<String>> a;
@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}
Upvotes: 27