xxestter
xxestter

Reputation: 509

Elasticsearch Bulk API

I would like to ask about the Elasticsearch Bulk API

This is my code for using Bulk API

public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
    BulkRequest request = new BulkRequest(); 

    for(String json: jsonList){
        if(json != null&& !json.isEmpty()){
            request.add(new IndexRequest(index)  
                    .source(json, XContentType.JSON));  
        }
    }

    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    for (BulkItemResponse bulkItemResponse : bulkResponse) { 
        DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

        switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:   
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        }
    }
    if (bulkResponse.hasFailures()) { 
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) { 
                BulkItemResponse.Failure failure =
                        bulkItemResponse.getFailure(); 

                System.out.println("failed: " + failure.getId());

            }
        }
    }
}

I have encountered the timeout exception as my records have got 800k. java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE]

I tried to break up the jsonList that I passed in, but sometime will have the same error.

I am currently using Elasticsearch 7.6.2 version.

The exception trace

java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:497) at com.ESUtil.bulkInsert(ESUtil.java:110) at org.download.App1.main(App1.java:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:58) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) at java.lang.Thread.run(Unknown Source)

Upvotes: 1

Views: 4354

Answers (1)

Amit
Amit

Reputation: 32376

As you are using the bulk API and sending a huge amount of data to Elasticsearch and default timeout for the connection is 30 seconds and Elasticsearch isn't able to finish this huge bulk operation in 30 seconds, hence you are getting this exception.

This timeout is normal for huge bulk APIs, and in your case(Indexing specific), you can do below:

Upgrade the infra and speed up indexing speed

Scale your cluster ie add more CPU, memory, better disk, disable refresh_interval(default is 1 sec) to speed up your bulk indexing.

Increase Bulk API timeout duration

As mentioned in official ES doc

request.timeout(TimeValue.timeValueMinutes(2));   --> 2 min timeout
request.timeout("2m");  --> string format of 2 sec.

Edit: As asked in the comment, you can use the sync execution of the bulk API, if you want to immediately check the response of your bulk API, below is quoted from the same doc:

Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances

Upvotes: 1

Related Questions