Reputation: 660
I am trying to filter BatchStage but it is not working it is giving me below error:
java.lang.IllegalArgumentException: "filterFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:203)
at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:154)
at com.hazelcast.jet.impl.pipeline.BatchStageImpl.filter(BatchStageImpl.java:105)
at com.aiv.hazelcast.JoinData.join(JoinData.java:121)
at com.aiv.hazelcast.Application.lambda$1(Application.java:93)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
at com.aiv.hazelcast.Application.runProcess(Application.java:90)
at com.aiv.hazelcast.Application.setUp(Application.java:44)
at com.aiv.hazelcast.Application.main(Application.java:26)
Caused by: java.io.NotSerializableException: com.hazelcast.com.fasterxml.jackson.jr.ob.impl.DeferredMap
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1387)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:201)
My code is as below what wrong am I doing?
This is working one:
BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
bd.filter(k -> {
return true;
}).writeTo(Sinks.logger());
This is not working weird :
BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
bd.filter(k -> {
return filterItems((List<Map<String, Object>>)rules.get("criteria"));
}).writeTo(Sinks.logger());
public static boolean filterItems(List<Map<String, Object>> innerrules) {
return true;
}
I don't know why it is not working, help me out. If I remove filter it is working properly but I need to filter.
Upvotes: 0
Views: 256
Reputation: 10812
We serialize the Pipeline
and send it to the cluster for execution. This means that all fields must be serializable. In your case, your lambda captures some local variable that is not serializable. Likely it is the rules
map. You need to copy the data to some serializable structure, e.g. to a HashMap
. Or replace like this:
Criteria criteria = (List<Map<String, Object>>) rules.get("criteria");
bd.filter(k -> {
return filterItems(criteria);
})
This will work, if Criteria
is serializable and filterItems
method is static - if it's not, also this
(the containing class instance) will be captured.
BTW, I wonder why you don't use k
in your filter implementation at all, this is likely a mistake.
Upvotes: 1