Krakenudo
Krakenudo

Reputation: 309

Apache Storm: Topology submission exception: [x] subscribes from non-existent stream

Sorry if the question is solved, but I tried to find it and I haven't had success. There are some similar, but I don't found help where I've seen. I have the next problem:

603  [main] WARN  b.s.StormSubmitter - Topology submission exception: 
    Component: [escribirFichero] subscribes from non-existent stream: 
               [default] of component [buscamosEnKlout]
Exception in thread "main" java.lang.RuntimeException: 
    InvalidTopologyException(msg:Component: 
               [escribirFichero] subscribes from non-existent stream: 
                   [default] of component [buscamosEnKlout])

I can't understand why I have this exception. I declare the bolt "buscamosEnKlout" before I use "escribirFichero". Next to my topology I'll put the elemental lines of the bolts. I know the spout is OK,because a trial-and-error approach.

The code of my topology is:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.stats.RollingWindow;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import bolt.*;
import spout.TwitterSpout;
import twitter4j.FilterQuery;

public class TwitterTopologia {
    private static String consumerKey = "xxx1";
    private static String consumerSecret = "xxx2";
    private static String accessToken = "yyy1";
    private static String accessTokenSecret="yyy2";

    public static void main(String[] args) throws Exception {
        /**************** SETUP ****************/
        String remoteClusterTopologyName = null;
        if (args!=null) { ... } 

        TopologyBuilder builder = new TopologyBuilder();
        FilterQuery tweetFilterQuery = new FilterQuery();
        tweetFilterQuery.track(new String[]{"Vacaciones","Holy Week", "Semana Santa","Holidays","Vacation"});
        tweetFilterQuery.language(new String[]{"en","es"});


        TwitterSpout spout = new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, tweetFilterQuery);

        KloutBuscador buscamosEnKlout = new KloutBuscador();
        FileWriterBolt fileWriterBolt = new FileWriterBolt("idUsuarios.txt");

        builder.setSpout("spoutLeerTwitter",spout,1);
        builder.setBolt("buscamosEnKlout",buscamosEnKlout,1).shuffleGrouping("spoutLeerTwitter");
        builder.setBolt("escribirFichero",fileWriterBolt,1).shuffleGrouping("buscamosEnKlout");


        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("twitter-fun", conf, builder.createTopology());
            Thread.sleep(460000);
            cluster.shutdown();
        }
    }
}

Bolt "KloutBuscador", alias "buscamosEnKlout", is the next code:

String text = tuple.getStringByField("id");

String cadenaUrl;

cadenaUrl = "http://api.klout.com/v2/identity.json/twitter?screenName=";
cadenaUrl += text.replaceAll("\\[", "").replaceAll("\\]","");
cadenaUrl += "&key=" + kloutKey;
URL url = new URL(cadenaUrl);
HttpURLConnection c = (HttpURLConnection) url.openConnection();
        ...........c.setRequestMethod("GET");c.setRequestProperty("Content-length", "0");c.setUseCaches(false);c.setAllowUserInteraction(false);c.connect();
int status = c.getResponseCode();
StringBuilder sb = new StringBuilder();
switch (status) {
    case 200:
    case 201:
       BufferedReader br = new BufferedReader(new InputStreamReader(c.getInputStream()));
       String line;
       while ((line = br.readLine()) != null) sb.append(line + "\n");
           br.close();
       }

JSONObject jsonResponse = new JSONObject(sb.toString());
//getJSONArray("id");
String results = jsonResponse.toString();
_collector.emit(new Values(text,results));

And the second bolt, fileWriterBolt, alias "escribirFichero", is the next one:

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    _collector = outputCollector;
    try {
        writer = new PrintWriter(filename, "UTF-8");...}...}

    public void execute(Tuple tuple) {
        writer.println((count++)+":::"+tuple.getValues());

 //+"+++"+tweet.getUser().getId()+"__FINAL__"+tweet.getUser().getName()
        writer.flush();
        // Confirm that this tuple has been treated.
        //_collector.ack(tuple);

    }

If I pass over the bolt of Klous and only write the result of the spout, it works. I don't understand why the Klous's bolt causes this failure

Upvotes: 3

Views: 1510

Answers (1)

Stig Rohde Døssing
Stig Rohde Døssing

Reputation: 3651

Your buscamosEnKlout bolt needs to declare the format of the tuples it will emit, as well as which streams it will emit to. You most likely haven't implemented declareOutputFields correctly in that bolt. It should contain something like declarer.declare(new Fields("your-text-field", "your-results-field"))

Upvotes: 3

Related Questions