user3458271
user3458271

Reputation: 660

Hazelcast jet flatMap says: These transforms have nothing attached to them: [flat-map]

I am trying to filter BatchStage but it is not working it is giving me below error:

 java.lang.IllegalArgumentException: "filterFn" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:203)
    at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:154)
    at com.hazelcast.jet.impl.pipeline.BatchStageImpl.filter(BatchStageImpl.java:105)
    at com.aiv.hazelcast.JoinData.join(JoinData.java:121)
    at com.aiv.hazelcast.Application.lambda$1(Application.java:93)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
    at com.aiv.hazelcast.Application.runProcess(Application.java:90)
    at com.aiv.hazelcast.Application.setUp(Application.java:44)
    at com.aiv.hazelcast.Application.main(Application.java:26)
Caused by: java.io.NotSerializableException: com.hazelcast.com.fasterxml.jackson.jr.ob.impl.DeferredMap
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1387)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:201)

My code is as below what wrong am I doing?

This is working one:

BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
         bd.filter(k -> { 
             
             return true;
             
         }).writeTo(Sinks.logger());

This is not working weird :

BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
         bd.filter(k -> { 
             
             return filterItems((List<Map<String, Object>>)rules.get("criteria"));
             
         }).writeTo(Sinks.logger());

public static boolean filterItems(List<Map<String, Object>> innerrules) {
    return true;
}

I don't know why it is not working, help me out. If I remove filter it is working properly but I need to filter.

Upvotes: 0

Views: 256

Answers (1)

Oliv
Oliv

Reputation: 10812

We serialize the Pipeline and send it to the cluster for execution. This means that all fields must be serializable. In your case, your lambda captures some local variable that is not serializable. Likely it is the rules map. You need to copy the data to some serializable structure, e.g. to a HashMap. Or replace like this:

Criteria criteria = (List<Map<String, Object>>) rules.get("criteria");
bd.filter(k -> { 
    return filterItems(criteria);
})

This will work, if Criteria is serializable and filterItems method is static - if it's not, also this (the containing class instance) will be captured.

BTW, I wonder why you don't use k in your filter implementation at all, this is likely a mistake.

Upvotes: 1

Related Questions