Reputation: 51
I want to be able to use grouping via a custom key but here is my attempt so far ,
We used custom classes for the keys of KV objects because we want to GroupBy with a more complicated condition rather than simple key matching using String etc.
```
PCollection<KV<Multikey, Iterable<SomeObject>> pc2 =
pc.apply(GroupByKey.<Multikey, SomeObject>create());
```
Express the match condition with the equals method.
```
class Multikey implements Serializable{
List<String> keys = new ArrayList<>(); //multiple key
......
@Override
public boolean equals(Object k){
...join conditions
}
}
```
But I get an error.
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:193)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:107)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.coders.Coder$NonDeterministicException: org.apache.beam.sdk.coders.SerializableCoder@18b411b5 is not deterministic because:
Java Serialization may be non-deterministic.
at org.apache.beam.sdk.coders.SerializableCoder.verifyDeterministic(SerializableCoder.java:205)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:191)
It seems that the order in which "keys" is serialized is wrong, so I implemented a custom serializer or tried various coders, but didn't work.
Upvotes: 3
Views: 3005
Reputation: 51
Thank you. I read the document.
I will explain why I want to use custom keys.
That's because we want to express a disjunction like "or", not a normal combination.
class Multikey implements Serializable
List <String> keys = new ArrayList <> ();
........
@Override
public boolean equals (Object k) {
if (k instanceof Multikey) {
List <String> ky = new ArrayList <String> (((Multikey) k) .keys);
// Representation of disjunction
ky.retainAll (keys);
return! ky.isEmpty ();
} else {
return false;
}
}
I read the document, but it seems that GroupBy's key needs to be a single deterministic value. Is it difficult to express disjunction in grouping?
Upvotes: 0
Reputation: 1725
Please note the GroupByKey documentation:
Two keys of type K are compared for equality not by regular Java Object.equals(java.lang.Object), but instead by first encoding each of the keys using the Coder of the keys of the input PCollection, and then comparing the encoded bytes. This admits efficient parallel evaluation. Note that this requires that the Coder of the keys be deterministic (see Coder.verifyDeterministic()). If the key Coder is not deterministic, an exception is thrown at pipeline construction time.
One possible approach: use a pardo which outputs KVs, output the value with a unique key as a string, based on the complicated conditional logic you mentioned.
The other approach is to use a custom object type for your key instead of a string, as you have tried so far. You'll need to implement a CustomCoder which is byte equivalent for two of your Objects which represent the same key.
Here is the Apache Beam documentation on specifying coders.
Here is a blog post with a few custom coder examples as well.
Also, Re: The exception itself. See verifyDeterministic docs to see the description of a deterministic coder. It is likely you have violated this constranit.
Upvotes: 3