Anton Alfred
Anton Alfred

Reputation: 43

Storm 0.10.0 reuse a topology design?

Can the following design be accomplished in Storm?

Lets take the wordcount example that is present in the following https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/WordCountTopology.java I am changing the word generator spout to a file reader spout

The design for this Word Count Topology is 1. Spout to read file and create sentences line by line 2. Bolt to split sentences to words 3. Bolt to add unique words and give a word and its corresponding count

So in a way the topology is describing the flow a file needs to take to count the unique words it has.

If I have two files file 1 and file 2 one should be able to call the same topology and create two instance of this topology to run the same word count.

In order to track if the word count has indeed finished the instances of word count topology should have a completed status once the file has been processed.

In the current design of Storm, I find that the Topology is the actual instance so it is like a task.

One needs to make two different calls with different Topology names like

for file 1 StormSubmitter.submitTopology("WordCountTopology1", conf,builder.createTopology());

for file 2 StormSubmitter.submitTopology("WordCountTopology2", conf,builder.createTopology());

not to mention the same upload of the jar using the storm client

storm jar stormwordcount-1.0.0-jar-with-dependencies.jar com.company.WordCount1Main.App "server" "filepath1"

storm jar stormwordcount-1.0.0-jar-with-dependencies.jar com.company.WordCount2Main.App "server" "filepath2"

The other issue is the topologies don't complete once the file is processed. They are alive all the time before we issue a kill on the topology

storm kill "WordCountTopology"

I understand that in a streaming world where the messages are coming from a message queue like Kafka there is no end of message but how is that relevant in the file world where the entities/messages are fixed.

Is there an API that does the following?

//creates the topology, this is done one time using the storm to upload the respective jars StormSubmitter.submitTopology("WordCountTopology", conf,builder.createTopology());

Once uploaded the application code just instantiates the topology with the agruments //creates an instance of the topology and give a status tracker JobTracker tracker = StormSubmitter.runTopology("WordCountTopology", conf, args);

//Can query the Storm for the current job if its complete or not JobStatus status = StormSubmitter.getTopologyStatus(conf, tracker);

Upvotes: 0

Views: 115

Answers (2)

Anton Alfred
Anton Alfred

Reputation: 43

The word count topology mentioned in the post doesn't do justice for the might and power of Storm. Since Storm is a Stream processor, it requires a stream; period. By definition files are files it is static. I empathize with the Storm developers on how can a simple hello world be given to the adoption on how to show case the topology concepts and a non stream technology like file was taken. So to the newbies who are learning Storm which I was at that time, it was a difficult to understand how to develop using the example. The example is just a way to show how Storm concepts work, not a real word application of how files would come or needs to be processed.

So here is the take on how one of the solution could be.

Since topologies run all the time, they can compute the word count for as long as one wants i,e within a file or across all files for any periods of time.

In order to allow for different files to come in, we would need a streaming spout. So naturally you would need a Kafka Message Broker or similar to receive files in a stream. Depending on the size of the file and the restriction that message brokers put namely Kafka which has a 1 MB file restriction, we could pick to send the file itself as the payload or the reference of the file in which case you would need a distributed file system to store the file namely a Hadoop DFS or a NAS.

We then read these files using a Kafka Spout as opposed to FileSpout.

We now have the following issues 1. Word Count Across Files 2. Word Count per File 3. Running Status on the word count till it is processed 4. When do we know if a file is processed or complete

  1. Word Count Across Files

Using the example provided, this is the use case the example targets so if we continue to stream the files and in each file we read the lines, split the word and send to other bolts, the bolts would count the words independent of which file it came from.

File1 A quick brown fox jumped ... File2 Once upon a time a fox ...

Field Grouping quick brown fox ... Once upon fox (not needed as it came in file 1) ...

  1. Word Count Per File In order to do this, we would now need to put the fields grouping of words to be appended with the fileId. So now the example needs to change to include a fileId for each word it splits. So File1 A quick brown fox jumped ... File2 Once upon a time a fox ...

So the fields grouping on word would be (canceling the noise words)

File1_quick File1_brown File1_fox

File2_once File2_upon File2_fox

  1. Running Status on the word count till it is processed Since all these counts are in memory of the bolt and we don't know the EoF there is no way to get the status unless someone peaks into the bolt or we send the counts periodically to another data store where we can query it. This is exactly what we need to do, which is at periodic intervals we need to persist the in-memory bolt counts to a data store like hbase, elastic, mongo db etc

  2. When do we know if a file is processed or complete Perhaps this is the toughest question to answer in the streaming world, basically the stream processor doesn't know the steam is finished as from its perspective the streams are files coming in and it needs to split each file into words and count in corresponding bolts. So they don't know what has happened before or after it reached each actor. This entire thing needs to be done by the app developer. One way to do this is when each file is read we count the total words and send a message File 1 : Total Words : 1000 File 2 : Total Words : 2000

Now when we do the word count and find different words per file File1_* the count of individual words and the total words should match before we say a file is complete. All these are custom logic we would need to write before we can say its complete.

So in essential Storm provides the framework to do stream processing in a variety of ways. Its the application developers job to develop with the design that it has and implement their own logic depending on the use case. It doesn't provide application use cases out of the box or a good reference implementation which I think we need to build as its not a commercial product and depends on community to champion.

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

For reusing the same topology twice, you have two possibilities:

1) Use a constructor parameter for your file spout and instantiate the same topology with twice with different parameters:

private StormTopology createMyTopology(String filename) {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("File Spout", new FileSpout(filename));
    // add further spouts and bolts etc.
    return builder.createTopology();
}

public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file1));
    } else {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file2));
    }
}

2) As an alternative, you could configure your file spout in open() method.

public class FileSpout extends IRichSpout {
    @Override
    public void open(Map conf, ...) {
        String filenmae = (String)conf.get("FILENAME");
        // ...
    }
    // other methods omitted
}

    public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        c.put("FILENAME", file1);
    } else {
        c.put("FILENAME", file2);
    }

    // assembly topology...

    StormSubmitter.submitTopology("T", c, builder.createTopology());
}

For you second question: there is no API in Storm that terminates a topology automatically. You could use TopologyInfo and monitor the number of emitted tuples of the spout. If it does not change for some time, you can assume that the whole file got read and then kill the topology.

Config cfg = new Config();
// set NIMBUS_HOST and NIMBUS_THRIFT_PORT in cfg
Client client = NimbusClient.getConfiguredClient(cfg).getClient();
TopologyInfo info = client.getTopologyInfo("topologyName");
// get emitted tuples...
client.killTopology("topologyName");

Upvotes: 0

Related Questions