Reputation: 165
I have brought in data from twitter source connector in my Kafka topic "demo-twitter-status"
I want to sink this in ElasticSearch. The connector I made is like this:
curl -k -X POST http://xxxxxxxxxx:8083/connectors -H "Content-Type: application/json" --data '{
"name":"sink-elasticsearch-connector1",
"config":{
"tasks.max": "2",
"topics":"demo-twitter-status",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schema.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable":"true",
"connection.url":"http://xxxxxxxx:9200",
"type.name":"kafka-connect",
"key.ignore":"true"
}
}'
this is giving error as:
{
"error_code": 400,
"message": "Connector config {type.name=kafka-connect, key.converter.schema.enable=true, tasks.max=2, topics=demo-twitter-status,
value.converter.schema.enable=true, name=sink-elasticsearch-connector1, value.converter=org.apache.kafka.connect.json.JsonConverter,
connection.url=http://===========:9200, key.ignore=true, key.converter=org.apache.kafka.connect.json.JsonConverter}
contains no connector type"
}
I have used "type.name":"kafkaconnect" and "type.name":"_doc"
also but still the same error appears.
Can anyone please help me correct my connector. Thanks in advance!!!!
Versions are as follows:
Confluent 5.4.0
ElasticSearch 7.6
PS: The confluent docs says that 7.x is supported for confluent 5.4.0
Upvotes: 1
Views: 640
Reputation: 32090
The cause is shown in your error:
Connector config … contains no connector type
You need to specify connector.class so that Kafka Connect knows to use the Elasticsearch sink connector.
Try
curl -k -X POST http://xxxxxxxxxx:8083/connectors -H "Content-Type: application/json" --data '{
"name":"sink-elasticsearch-connector1",
"config":{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics":"demo-twitter-status",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schema.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable":"true",
"connection.url":"http://xxxxxxxx:9200",
"type.name":"kafka-connect",
"key.ignore":"true"
}
}'
Upvotes: 2