Jarrod Sears
Jarrod Sears

Reputation: 1131

Java Apache Storm Spout empty deserialized LinkedList object attribute

I have a spout class that has several integer and string attributes, which are serialized/deserialized as expected. The class also has 1 LinkedList holding byte arrays. This LinkedList is always empty when an object is deserialized.

I've added log statements into all of the spout methods and can see the spout's 'activate' method being called, after which, the LinkedList is empty. I do not see any logs when this happens for the 'deactivate' method.

It seems odd that the spout 'activate' method is being called without the 'deactivate' method having been called. When the 'activate' method is called, there has not been any resubmission of the topology.

I also have a log statement in the spout constructor, which is not called prior to the LinkedList being emptied.

I've also verified repeatedly that there are no calls anywhere within the spout class to any method that would completely empty the LinkedList. There is 1 spot that uses the poll method, which is immediately followed by a log statement to log the new LinkedList size.

I found this reference, which points to Kryo being used for Serialization, but it may just be for serializing tuple data. http://storm.apache.org/documentation/Serialization.html

Storm uses Kryo for serialization. Kryo is a flexible and fast serialization library that produces small serializations.

By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, HashSet, and the Clojure collection types. If you want to use another type in your tuples, you'll need to register a custom serializer.

The article makes it sound like Kryo may be just for serializing and passing tuples, but if it is for the Spout object as well, I can't figure out how to then use a LinkedList as ArrayLists and HashMaps aren't really a good alternative for a FIFO queue. Will I have to roll my own LinkedList?

public class MySpout extends BaseRichSpout
{

    private SpoutOutputCollector _collector;
    private LinkedList<byte[]> messages = new LinkedList<byte[]>();

    public MyObject()
    {
        queue = new LinkedList<ObjectType>();
    }

    public void add(byte[] message)
    {
        messages.add(message);
    }

    @Override
    public void open( Map conf, TopologyContext context,
            SpoutOutputCollector collector )
    {
        _collector = collector;

        try
        {           
            Logger.getInstance().addMessage("Opening Spout");
            // ####### Open client connection here to read messages
        }
        catch (MqttException e)
        {
            e.printStackTrace();
        }
    }

    @Override
    public void close()
    {
        Logger.getInstance().addMessage("Close Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void activate()
    {
        Logger.getInstance().addMessage("Activate Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void nextTuple()
    {

        if (!messages.isEmpty())
        {
            System.out.println("Tuple emitted from spout");            
            _collector.emit(new Values(messages.poll()));
            Logger.getInstance().addMessage("Tuple emitted from spout. Remaining in queue: " + messages.size());
            try
            {
                Thread.sleep(1);
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                Logger.getInstance().addMessage("Sleep thread interrupted in nextTuple(). " + Logger.convertStacktraceToString(e));
                e.printStackTrace();
            }
        }
    }
}

EDIT:

Java Serialization of referenced objects is "losing values"? http://www.javaspecialists.eu/archive/Issue088.html

The above SO link and the java specialists article call out specific examples similar to what I am seeing and the issue is do the serialization/deserialization cache. But because Storm is doing that work, I'm not sure what can be done about the issue.

At the end of the day, it also seems like the bigger issue is that Storm is suddenly serializing/deserializing the data in the first place.

EDIT:

Just prior to the Spout being activated, a good number log messages come through in less than a second that read:

Executor MyTopology-1-1447093098:[X Y] not alive

After those messages, there is a log of:

Setting new assignment for topology id MyTopology-1-1447093098: #backtype.storm.daemon.common.Assignment{:master-code-dir ...

Upvotes: 0

Views: 542

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62310

If I understand your problem correctly, you instantiate your spout at the client side, add messages via addMessage(), give the spout to the TopologyBuilder via addSpout(), and submit the topology afterwards to your cluster? When the topology is started, you expect the spout message list to contain the messages you added? If this is correct, you usage pattern is quite odd...

I guess the problem is related to Thrift which is used to submit the topology to the cluster. Java serialization is not used and I assume, that the Thrift code does not serialize the actual object. As far as I understand the code, the topology jar is shipped binary, and the topology structure is shipped via Thrift. On the workers that executes the topology, new spout/bolt object are created via new. Thus, no Java serialization/deserialization happens and you LinkedList is empty. Due to the call of new it is of course not null either.

Btw: you are right about Kryo, it is only used to ship data (ie, tuples).

As a work around, you could add the LinkedList to the Map that is given to StormSubmitter.submitTopology(...). In Spout.open(...) you should get a correct copy of your messages from the Map parameter. However, as I mentioned already, your usage pattern is quite odd -- you might want to rethink this. In general, a spout should be implemented in a way, that is can fetch the data in nextTuple() from an external data source.

Upvotes: 0

Related Questions