Reputation: 305
I may be missing something obvious, but for some reason I can't make PAssert
& TestPipeline
work with CoGroupByKey
-- but without it, it works fine.
Here is a reference test file that can reproduce the issue I'm facing. I tested with both beam sdk 2.4 and 2.5.
For comparison, testWorking
works as intended, and testBroken
has an additional step like this:
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
The error I get can be found after the code below.
public class ReferenceTest {
@Rule
public final transient TestPipeline pipe1 = TestPipeline.create();
@Rule
public final transient TestPipeline pipe2 = TestPipeline.create();
public static class String2KV extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
// "key1:value1" -> ["key1", "value1"]
String[] tokens = c.element().split(":");
c.output(KV.of(tokens[0], tokens[1]));
}
}
public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> {
final TupleTag<String> inTag1;
final TupleTag<String> inTag2;
final TupleTag<String> outTag2;
public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, TupleTag<String> outTag2) {
this.inTag1 = inTag1;
this.inTag2 = inTag2;
this.outTag2 = outTag2;
}
@ProcessElement
public void processElement(ProcessContext c) {
String val1 = c.element().getValue().getOnly(inTag1);
String val2 = c.element().getValue().getOnly(inTag2);
// outTag1 = main output
// outTag2 = side output
c.output(outTag2, val1 + "," + val2);
}
}
@Test
public void testWorking() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe1.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
pipe1.run();
}
@Test
public void testBroken() {
// Create two PCs for test.
PCollection<String> pc1 =
pipe2.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of()));
PCollection<KV<String, String>> pc2 =
pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
// Sanity check.
PAssert.that(pc1).containsInAnyOrder("key1:value1");
PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2"));
TupleTag<String> inTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
// The following four lines causes an issue.
PCollectionTuple tuple =
KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV())))
.and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn",
ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
// Without the following two PAsserts, the CoGBK step above seems to cause an issue.
PAssert.that(tuple.get(outTag1)).empty();
PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2");
pipe2.run();
}
}
Here's the error:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525)
at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.HashMap.internalWriteEntries(HashMap.java:1789)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 54 more
Process finished with exit code 255
Has anyone had a similar issue with test pipeline before?
I haven't tested it yet extensively, but I couldn't find relevant information on CoGroupByKey
& TestPipeline
together.
In production, the same code works fine for my team, and we wanted to add a few unit tests using TestPipeline
and PAssert
. That's how we ended up with this issue.
Any help will be appreciated!
Upvotes: 0
Views: 349
Reputation: 814
THe root cause from the exception is 'java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest'. Some of the objects are inadvertently holding reference to the outer class 'ReferenceTest'. Those are most likely the anonymous classes defined for TupleTags. Please try making them regular classes.
Upvotes: 1