Reputation: 61
I have a use-case, where we need to create PCollection which contains fields which are of abstract data type. How to define schema and coder in such cases.
This data is picked up from json files present in some data-source (local/S3, etc)
for ex:
PCollection<Customer>, where Customer is defined as
Customer {
Gender gender;
}
interface Gender {
}
Female implmements Gender {
}
in pipeline , for customer setting the schema as :
pipeline.getSchemaRegistery().getSchema(Customer.class)
For Testing, have created the PCollection using
Pipeline.apply(Create.of(getCustomers())), where getCustomers gives List
Pipeline fails with following exception :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:371)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:339)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:219)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at section5.ComplexCombine1.main (ComplexCombine1.java:147)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:353)
at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
Caused by: java.lang.NoSuchMethodException: entities.Gender.<init>()
at java.lang.Class.getConstructor0 (Class.java:3110)
at java.lang.Class.getDeclaredConstructor (Class.java:2206)
at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:347)
at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs$DoFnInvoker.invokeFinishBundle (Unknown Source)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:242)
at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:125)
at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:269)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:193)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:131)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.025 s
[INFO] Finished at: 2021-05-12T00:30:14+05:30
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project ApacheBeam1: An exception occured while executing the Java class. java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Y.<init>() -> [Help 1]
Upvotes: 3
Views: 1024
Reputation: 2024
I think the failure might be due to pipeline not being able to property determine the Coder
for the Customer
type. When using the Create
transform for a custom type, you may have to specify a Coder
using the withCoder
method.
See here for more details. See here for an example.
Upvotes: 2