the100rabh
the100rabh

Reputation: 4147

Hazelcast throws java.io.NotSerializableException for Predicate based supplier for Aggregate

Below is the code for my attempt to run aggregates on hazelcast using Hazelcast Client. The first aggregate works out just fine, but the 2nd one throws an java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment

My code:

public class AggregateExperiment {
    public void runTest(){
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addAddress("127.0.0.1:5701");
        clientConfig.setClassLoader(this.getClass().getClassLoader());
        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        IMap<String, Integer> map = client.getMap("customers");
        Supplier<String, Integer, Integer> supplier = Supplier.all();
// Choose the average aggregation
        Aggregation<String, Integer, Integer> aggregation = Aggregations.integerAvg();
        int average  =  map.aggregate(supplier, aggregation);
        System.out.println("Average of inputs = "+average);

        supplier = Supplier.fromKeyPredicate(new MyKeyPredicate());
// Choose the sum aggregation
        aggregation = Aggregations.integerSum();
        average  =  map.aggregate(supplier, aggregation );
        System.out.println("Average of inputs = "+average);

    }

    public class MyKeyPredicate implements KeyPredicate<String> {
        public boolean evaluate(String key) {
            return Integer.parseInt(key) % 4 == 0;
        }
    }
}

Error messages :

Average of inputs = 1
[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.hazelcast.core.HazelcastException: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:988)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:188)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.access$000(ClientInvocationFuture.java:41)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:234)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
    at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:380)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:307)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
    at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
    at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
    at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
    at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
    at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
    at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
    at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
    at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
    at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
    at ------ End remote and begin local stack-trace ------.(Unknown Source)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175)
    ... 8 more
Caused by: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at com.hazelcast.nio.serialization.DefaultSerializers$ObjectSerializer.write(DefaultSerializers.java:223)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
    at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
    at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
    at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
    at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
    at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
    at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
    at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
    at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
    at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
    at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
    at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
    at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
    at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
    at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)

Any help is much appreciated. Thanks

Upvotes: 1

Views: 2138

Answers (2)

the100rabh
the100rabh

Reputation: 4147

Hazelcast as of now does not support distributed classloading which would be needed for the code above to work properly.

An issue is github is already opened for the same https://github.com/hazelcast/hazelcast/issues/7394

For now, those who really want to use Hazelcast in this manner can explore https://github.com/serkan-ozal/hermgen as well

Upvotes: 0

kostya
kostya

Reputation: 9559

Instances of the inner class MyKeyPredicate contain an implicit reference to parent AggregateExperiment instance. This makes MyKeyPredicate not serializable. You should make MyKeyPredicate static:

public static class MyKeyPredicate implements KeyPredicate<String> {
    ...

Upvotes: 2

Related Questions