Reputation: 4147
Below is the code for my attempt to run aggregates on hazelcast using Hazelcast Client. The first aggregate works out just fine, but the 2nd one throws an java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
My code:
public class AggregateExperiment {
public void runTest(){
ClientConfig clientConfig = new ClientConfig();
clientConfig.addAddress("127.0.0.1:5701");
clientConfig.setClassLoader(this.getClass().getClassLoader());
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
IMap<String, Integer> map = client.getMap("customers");
Supplier<String, Integer, Integer> supplier = Supplier.all();
// Choose the average aggregation
Aggregation<String, Integer, Integer> aggregation = Aggregations.integerAvg();
int average = map.aggregate(supplier, aggregation);
System.out.println("Average of inputs = "+average);
supplier = Supplier.fromKeyPredicate(new MyKeyPredicate());
// Choose the sum aggregation
aggregation = Aggregations.integerSum();
average = map.aggregate(supplier, aggregation );
System.out.println("Average of inputs = "+average);
}
public class MyKeyPredicate implements KeyPredicate<String> {
public boolean evaluate(String key) {
return Integer.parseInt(key) % 4 == 0;
}
}
}
Error messages :
Average of inputs = 1
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.hazelcast.core.HazelcastException: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:988)
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
... 6 more
Caused by: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:188)
at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160)
at com.hazelcast.client.spi.impl.ClientInvocationFuture.access$000(ClientInvocationFuture.java:41)
at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:234)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:380)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:307)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
at ------ End remote and begin local stack-trace ------.(Unknown Source)
at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175)
... 8 more
Caused by: java.io.NotSerializableException: co.near.hazelcast.AggregateExperiment
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.hazelcast.nio.serialization.DefaultSerializers$ObjectSerializer.write(DefaultSerializers.java:223)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.aggregation.impl.KeyPredicateSupplier.writeData(KeyPredicateSupplier.java:79)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.aggregation.impl.SupplierConsumingMapper.writeData(SupplierConsumingMapper.java:75)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315)
at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.writeData(ClientMapReduceRequest.java:204)
at com.hazelcast.mapreduce.impl.client.ClientMapReduceRequest.write(ClientMapReduceRequest.java:173)
at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86)
at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62)
at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53)
at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207)
at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104)
at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnRandomTarget(ClientSmartInvocationServiceImpl.java:60)
at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:163)
at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:147)
at com.hazelcast.client.proxy.ClientMapReduceProxy$ClientJob.invoke(ClientMapReduceProxy.java:124)
at com.hazelcast.mapreduce.impl.AbstractJob.submit(AbstractJob.java:119)
at com.hazelcast.mapreduce.impl.AbstractJob$ReducingSubmittableJobImpl.submit(AbstractJob.java:348)
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:985)
at com.hazelcast.client.proxy.ClientMapProxy.aggregate(ClientMapProxy.java:960)
at co.near.hazelcast.AggregateExperiment.runTest(AggregateExperiment.java:31)
at co.near.hazelcast.MainExperiment.main(MainExperiment.java:131)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Any help is much appreciated. Thanks
Upvotes: 1
Views: 2138
Reputation: 4147
Hazelcast as of now does not support distributed classloading which would be needed for the code above to work properly.
An issue is github is already opened for the same https://github.com/hazelcast/hazelcast/issues/7394
For now, those who really want to use Hazelcast in this manner can explore https://github.com/serkan-ozal/hermgen as well
Upvotes: 0
Reputation: 9559
Instances of the inner class MyKeyPredicate
contain an implicit reference to parent AggregateExperiment
instance. This makes MyKeyPredicate
not serializable. You should make MyKeyPredicate
static:
public static class MyKeyPredicate implements KeyPredicate<String> {
...
Upvotes: 2