Carbon
Carbon

Reputation: 3943

Ignite: Simple Stream to Server Issue

I have the following two scripts, which follow almost exactly the stream example. The source of the code is here: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java

However, though the client connects nicely to the server, when running the stream, I get dreadful errors on the server side - have I failed to configure something properly?

[12:41:43] (err) Failed to execute compound future reducer: GridCompoundFuture [rdc=null, initFlag=1, lsnrCalls=0, done=false, cancelled=false, err=null, futs=[true, true, true, true, false]]class org.apache.ignite.IgniteCheckedException: DataStreamer request failed [node=befcb4b8-3262-4d16-be65-c9377b033245]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:1857)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:336)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:748)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to unmarshal object with optimized marshaller
    at org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9801)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:289)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
    ... 6 more
Caused by: class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller
    at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1786)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1962)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
    at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:310)
    at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal0(BinaryMarshaller.java:99)
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
    at org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9795)
    ... 9 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: [clsLdr=sun.misc.Launcher$AppClassLoader@18b4aac2, err=java.lang.reflect.InvocationTargetException]
    at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:235)
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:94)
    at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1783)
    ... 15 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:611)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObject0(OptimizedObjectInputStream.java:346)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:199)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:421)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:513)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:601)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:927)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObject0(OptimizedObjectInputStream.java:346)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:199)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:421)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:227)
    ... 17 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:606)
    ... 28 more
Caused by: java.lang.NoSuchMethodException: Test.$deserializeLambda$(java.lang.invoke.SerializedLambda)
    at java.lang.Class.getDeclaredMethod(Class.java:2130)
    at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
    at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
    ... 32 more

Server:

import org.apache.ignite.Ignition;
import org.apache.ignite.Ignite;

public class Test {
    public static void main(String[] args)
    {
        Ignite ignite = Ignition.start();
    }
}

Client:

import org.apache.ignite.Ignition;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import java.util.List;
import java.util.Random;

import org.apache.ignite.stream.StreamTransformer;

public class Test {
    private static final String CACHE_NAME = "randomNumbers";
    private static final Random RAND = new Random();
    private static final int RANGE = 1000;
    public static void main(String[] args)
    {
        Ignition.setClientMode(true);
        try(Ignite ignite = Ignition.start()) {
            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);
            cfg.setIndexedTypes(Integer.class, Long.class);

            try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
                try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
                    stmr.allowOverwrite(true);

                    stmr.receiver(StreamTransformer.from((e, arg) -> {
                        Long val = e.getValue();
                        e.setValue(val == null ? 1L : val + 1);
                        return null;
                    }));

                    // Stream 10 million of random numbers into the streamer cache.
                    for (int i = 1; i <= 1_000; i++) {
                        stmr.addData(RAND.nextInt(RANGE), 1L);

                        if (i % 500_000 == 0)
                            System.out.println("Number of tuples streamed into Ignite: " + i);
}}}}}}

Upvotes: 1

Views: 686

Answers (1)

Michael
Michael

Reputation: 650

To make it work you need to replace lambda with a static nested class.

In examples server and client share the same classpath, so server can access to Test class that is required to deserialization, more details you can find in the following thread: Unable to deserialize lambda

Upvotes: 3

Related Questions