Emanuel Barac
Emanuel Barac

Reputation: 31

Indexing tuples from storm to elasticsearch with elasticsearch-hadoop library does not work

I want to index documents into Elasticsearch from Storm, but I couldn't get any document to be indexed into Elasticsearch.

In my topology I have a KafkaSpout that emits a json like this { “tweetId”: 1, “text”: “hello” } to a EsBolt that is a native bolt from elasticsearch-hadoop library that writes the Storm Tuples to Elasticsearch (doc is here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html). These are the configs for my EsBolt:

Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);

The first two configurations have these values by default, but I chose to set them explicitly. I have also tried without them, getting the same result.

And this is how I build my topology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
        .setNumTasks(kafkaSpoutNumberOfTasks);


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
        .setNumTasks(elasticsearchBoltNumberOfTasks)
        .shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);

return builder.createTopology();

Before I run the topology locally I create the "twitter" index in Elasticsearch and a mapping "tweet" for this index. This is what I get if I retrieve the mapping for my newly created type (curl -XGET 'http://localhost:9200/twitter/_mapping/tweet'):

{
   "twitter": {
      "mappings": {
         "tweet": {
            "properties": {
               "text": {
                  "type": "string"
               },
               "tweetId": {
                  "type": "string"
               }
            }
         }
      }
   }
}

I run the topology locally and this is what I get in my console when processing a tuple:

Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]

BOLT ack TASK: 6 TIME:  TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:

So the tuples seems to be processed. However I don't have any document indexed in Elasticsearch.

I suppose I am doing something wrong when I set the configurations for EsBolt, maybe missing a configuration or something.

Upvotes: 3

Views: 1322

Answers (2)

zhongyin miao
zhongyin miao

Reputation: 1

I have also got the same issue, but when I looking for the es-Hadoop documents, I find because I was miss set the frequency that triggers a queue flush.Then I add a configurations to my store topology (es.storm.bolt.flush.entries.size ), it's fine.but when we setting the value for Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS .it's throw an exception :java.lang.RuntimeException:java.lang.NullPointerException in bolt execute function. then we use debug mode to test my topology, I find the input tuple in bolt execute don't contain any entries, but this empty tuple is been triggered. That's what I feel confusion. Don't the tuple will be emitted according to the setting time, Even though this tuple is empty after we set Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS.i think which is a bug. enter image description here enter image description here

more information you can see:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html

Upvotes: 0

PhaedrusTheGreek
PhaedrusTheGreek

Reputation: 584

Documents will only be indexed once you reach the flush size, specified by es.storm.bolt.flush.entries.size

Alternately, you may set a TICK frequency that triggers a queue flush.

config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);

By default, es-hadoop flushes on tick, as per the es.storm.bolt.tick.tuple.flush parameter.

Upvotes: 1

Related Questions