Reputation: 23
I use AtomicLongMap
in the construct function of RichMapFunctin.Like
public class PathAnalysis extends RichMapFunction<ApiLog, ApiLog> {
private final AtomicLongMap<Object> mObjectAtomicLongMap;
public PathAnalysis()
{
mObjectAtomicLongMap = AtomicLongMap.create();
}
}
register the custom serizlize class, but it not work
env.getConfig().registerTypeWithKryoSerializer(AtomicLongMap.class, new AtomicLongMapSerializer());
It cause:
org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:528)
at com.ghzs.Topology.main(Topology.java:91)
Caused by: java.io.NotSerializableException: org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.AtomicLongMap
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 4 more
AtomicLongMap
is not implements of Serializable
.How can I register a effective custom serialize method?
Upvotes: 0
Views: 1378
Reputation: 9245
Mark AtomicLongMap as transient, and allocate it in the open() call that your function will receive (because it's a RichMapFunction). So something like:
public class PathAnalysis extends RichMapFunction<ApiLog, ApiLog> {
private transient AtomicLongMap<Object> mObjectAtomicLongMap;
public PathAnalysis() { }
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mObjectAtomicLongMap = AtomicLongMap.create();
}
}
Upvotes: 3
Reputation: 13
try with implementation of this function like:
import org.apache.flink.api.common.functions.RichMapFunction;
import com.google.common.util.concurrent.AtomicLongMap;
public class PathAnalysis extends RichMapFunction<ApiLog, ApiLog> {
private final AtomicLongMap<Object> mObjectAtomicLongMap;
public PathAnalysis()
{
mObjectAtomicLongMap = AtomicLongMap.create();
}
@Override
public ApiLog map(ApiLog value) throws Exception {
// TODO Auto-generated method stub
return null;
}
}
Upvotes: 0