Satyam Khare
Satyam Khare

Reputation: 61

Apache Beam creating PCollection of Custom Entities/Models with Abstract Fields

I have a use-case, where we need to create PCollection which contains fields which are of abstract data type. How to define schema and coder in such cases.

This data is picked up from json files present in some data-source (local/S3, etc)

for ex:

PCollection<Customer>, where Customer is defined as

    Customer {
      Gender gender;
    }
    
    interface Gender {
    }
    
    Female implmements Gender {
    
    }

in pipeline , for customer setting the schema as :

pipeline.getSchemaRegistery().getSchema(Customer.class)

For Testing, have created the PCollection using

Pipeline.apply(Create.of(getCustomers())), where getCustomers gives List

Pipeline fails with following exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:371)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:339)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:219)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at section5.ComplexCombine1.main (ComplexCombine1.java:147)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:353)
    at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
    at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
    at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
Caused by: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at java.lang.Class.getConstructor0 (Class.java:3110)
    at java.lang.Class.getDeclaredConstructor (Class.java:2206)
    at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:347)
    at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
    at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
    at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs$DoFnInvoker.invokeFinishBundle (Unknown Source)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:242)
    at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:125)
    at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:269)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
    at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:193)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:131)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.025 s
[INFO] Finished at: 2021-05-12T00:30:14+05:30
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project ApacheBeam1: An exception occured while executing the Java class. java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Y.<init>() -> [Help 1]

Upvotes: 3

Views: 1024

Answers (1)

chamikara
chamikara

Reputation: 2024

I think the failure might be due to pipeline not being able to property determine the Coder for the Customer type. When using the Create transform for a custom type, you may have to specify a Coder using the withCoder method.

See here for more details. See here for an example.

Upvotes: 2

Related Questions