Oles Rid
Oles Rid

Reputation: 93

Kafka elasticsearch connector - 'Flush timeout expired with unflushed records:'

I have a strange problem with kafka -> elasticsearch connector. First time when I started it all was great, I received a new data in elasticsearch and checked it through kibana dashboard, but when I produced new data in to kafka using the same producer application and tried to start connector one more time, I didn't get any new data in elasticsearch. Now I'm getting such errors:

[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

I'm using next command to run connector:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties:

bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

and log-platform-elastic.properties:

name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

I checked connection to kafka brokers, elasticsearch and schema-registry(schema-registry and connector are on the same host at this moment) and all is fine. Kafka brokers are running on port 9093 and I'm able to read data from topics using kafka-avro-console-consumer. I'll be gratefull for any help on this!

Upvotes: 4

Views: 4132

Answers (2)

Nitin
Nitin

Reputation: 3832

We can optimized Elastic search configuration to solve issue. Please refer below link for configuration parameter

https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html

Below are key parameter which can control message rate flow to eventually help to solve issue:

flush.timeout.ms: Increase might help to give more breath on flush time

The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.

max.buffered.records: Try reducing buffer record limit

The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task

batch.size: Try reducing batch size

The number of records to process as a batch when writing to Elasticsearch

tasks.max: Number of parallel thread(consumer instance) Reduce or Increase. This will control Elastic Search if bandwidth not able to handle reduce task may help.

It worked my issue by tuning above parameters

Upvotes: 0

Ehud Lev
Ehud Lev

Reputation: 2901

Just update flush.timeout.ms to bigger than 10000 (10 seconds which is the default)

According to documentation:

flush.timeout.ms The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.

Type: long Default: 10000 Importance: low

See documentation

Upvotes: 2

Related Questions