kuti
kuti

Reputation: 211

"index_already_exists_exception" - kafka to Elastic Search(SSL)

We've kafka elastic search sink connector transporting data to Elastic search(v5.6.3).

I'm using confluent v5.0.0 and I do not see any other error. I've deleted the index and restarted the elastic search connector. But still getting same error

The connector configuration is as follows :

{
"name":"elasticsearch_topic",
"config":
{       
    "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max":"3",
    "topics":"sample_topic",
    "connection.url":"https://127.0.0.1:9200,https://127.0.0.2:9200",               
    "connection.username":"elsatic_user",
    "connection.password":"elastic_user",
    "type.name":"log",
    "flush.timeout.ms":"60000",
    "connection.timeout.ms":"60000",
    "read.timeout.ms":"60000",
    "batch.size":"20",
    "topic.index.map":"sample_topic:elastic_search_index_test",
    "transforms":"extract,insertenv,inserttimestamp,convert_current_ts,routeTS",
    "schema.ignore": "true",
    "transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extract.field":"RE_NUM",
    "transforms.insertenv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertenv.static.field": "_env",
        "transforms.insertenv.static.value": "dev",
        "transforms.inserttimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.inserttimestamp.timestamp.field": "date_time",      
            "transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convert_current_ts.target.type": "Timestamp",
        "transforms.convert_current_ts.field": "date_time",
        "transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
        "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",  
    "transforms.routeTS.topic.format":"elastic_search_index_test-${timestamp}",  
    "transforms.routeTS.timestamp.format":"yyyyMMdd"
 }
}

So far so good. No issues.

Recently we've enabled SSL on elastic search and for this I've added "username" and "password" and "https" to the above configurations.Then restarted the connector and worker. Since then I can see "index_already_exists_exception" with error as below :

[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} 
 Task threw an uncaught and unrecoverable exception 
 (org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Could not create index 
 'elastic_search_index_test': {"root_cause": 
 [{"type":"index_already_exists_exception","reason":"index 
[elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}],"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 [2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

Steps that I've tried so far :

  1. Stopped the elastic search sink connector and worker
  2. Deleted the index "elastic_search_index_test" from elastic search (through Kibana)
  3. Restarted the worker and elastic search connector

But still getting the same error (as described above)

Could any one suggest what's wrong going on?

Thanks in advance!!

Upvotes: 2

Views: 1326

Answers (1)

Manjeet Duhan
Manjeet Duhan

Reputation: 117

This is very common error when you start connector with multiple tasks ("tasks.max":"3" in current case) .

Internal Steps kafka-connect-elasticsearch

  1. kafka-connect-elasticsearch will be check if indexe does not exist
  2. it will create index if missing in ES

Problem :-

this conector is running with 3 tasks (means 3 threads executing same code) and more than one task found that index does not exist and went on to create the index. 1st task succeed and 2nd will throw index not found exception because it was already created by 1st task.

Solution :-

  1. start connector with one task "tasks.max":"1" (This is bad option if we have huge data)

  2. create index in Es before running connector

  3. use distributed lock (like zookeeper)

Upvotes: 1

Related Questions