Reputation: 211
We've kafka elastic search sink connector transporting data to Elastic search(v5.6.3).
I'm using confluent v5.0.0 and I do not see any other error. I've deleted the index and restarted the elastic search connector. But still getting same error
The connector configuration is as follows :
{
"name":"elasticsearch_topic",
"config":
{
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":"3",
"topics":"sample_topic",
"connection.url":"https://127.0.0.1:9200,https://127.0.0.2:9200",
"connection.username":"elsatic_user",
"connection.password":"elastic_user",
"type.name":"log",
"flush.timeout.ms":"60000",
"connection.timeout.ms":"60000",
"read.timeout.ms":"60000",
"batch.size":"20",
"topic.index.map":"sample_topic:elastic_search_index_test",
"transforms":"extract,insertenv,inserttimestamp,convert_current_ts,routeTS",
"schema.ignore": "true",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field":"RE_NUM",
"transforms.insertenv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertenv.static.field": "_env",
"transforms.insertenv.static.value": "dev",
"transforms.inserttimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.inserttimestamp.timestamp.field": "date_time",
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_current_ts.target.type": "Timestamp",
"transforms.convert_current_ts.field": "date_time",
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"elastic_search_index_test-${timestamp}",
"transforms.routeTS.timestamp.format":"yyyyMMdd"
}
}
So far so good. No issues.
Recently we've enabled SSL on elastic search and for this I've added "username" and "password" and "https" to the above configurations.Then restarted the connector and worker. Since then I can see "index_already_exists_exception" with error as below :
[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1}
Task threw an uncaught and unrecoverable exception
(org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Could not create index
'elastic_search_index_test': {"root_cause":
[{"type":"index_already_exists_exception","reason":"index
[elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}],"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
Steps that I've tried so far :
But still getting the same error (as described above)
Could any one suggest what's wrong going on?
Thanks in advance!!
Upvotes: 2
Views: 1326
Reputation: 117
This is very common error when you start connector with multiple tasks ("tasks.max":"3" in current case) .
Internal Steps kafka-connect-elasticsearch
Problem :-
this conector is running with 3 tasks (means 3 threads executing same code) and more than one task found that index does not exist and went on to create the index. 1st task succeed and 2nd will throw index not found exception because it was already created by 1st task.
Solution :-
start connector with one task "tasks.max":"1" (This is bad option if we have huge data)
create index in Es before running connector
use distributed lock (like zookeeper)
Upvotes: 1