Reputation: 661
For recently, I want to scroll through the old index data to new monthly-based indices. The stored data begin from 2015/07 until now. and it is almost 30,000 records for every month. Follow the scroll and bulk methods provided in 2.2 API, I finish the code as follows.
file main.coffee
logger = require 'graceful-logger'
elasticsearch = require 'elasticsearch'
setMonthlyIndices = require './es-test-promise'
client = new elasticsearch.Client
host:
host: 'localhost'
port: 9200
protocol: 'http'
setMonthlyIndices client, 'es_test_messages', 'talk_messages_v2', 'messages', 2015, 6
file es-test-promise.coffee
logger = require 'graceful-logger'
elasticsearch = require 'elasticsearch'
config = require 'config'
setIndice = (client, prefix, index, type, year, month) ->
allDocs = []
count = 0
prevYear = year + ''
# with leading '0' for month less than 10
prevMonth = ("0" + month).slice(-2)
nextDate = new Date(year, month)
nextYear = nextDate.getFullYear().toString()
nextMonth = ("0" + (nextDate.getMonth()+1)).slice(-2)
minDate = "#{prevYear}-#{prevMonth}-01"
maxDate = "#{nextYear}-#{nextMonth}-01"
indice_name = "#{prefix}_#{prevYear}_#{prevMonth}"
q =
filtered:
filter:
range:
createdAt:
gte: minDate
lt: maxDate
format: "yyyy-MM-dd"
client.search
index: index
type: type
scroll: '1m'
body:
query: q
sort: ['_doc']
size: 1000
, callback = (err, response) ->
console.log "indice_name 1", indice_name
return logger.err err.stack if err
return unless response.hits?.total
allDocs = []
response.hits.hits.forEach (hit)->
action =
index:
_id: hit._id
allDocs.push(action)
allDocs.push(hit._source)
count = count + allDocs.length
client.bulk
index: indice_name
type: type
body: allDocs
, (err, resp) ->
console.log "indice_name 2", indice_name
return logger.err err.stack if err
if response.hits.total *2 != count
client.scroll
scrollId: response._scroll_id
scroll: '1m'
, callback
else
logger.info "Finish indicing #{indice_name}"
setMonthlyIndices = (client, prefix, index, type, year, month) ->
current = new Date()
currentYear = current.getFullYear()
currentMonth = current.getMonth() + 1
processYear = year or currentYear
processMonth = month or 0
processDate = new Date(processYear, processMonth)
currentDate = new Date(currentYear, currentMonth)
processDate = new Date(2015, 6)
currentDate = new Date(2015, 9)
while processDate <= currentDate
year = processDate.getFullYear()
month = processDate.getMonth() + 1
setIndice(client, prefix, index, type, year, month)
processDate.setMonth(processDate.getMonth() + 1)
module.exports = setMonthlyIndices
I am wondering whether it is due to open too many client request, because in file es-test-promise.coffee, all these search request is running simultaneously. This is just a guess, and then I have also tried to implement with promise to make sure the request could be executed one by one. Finally, I can't figure it out and give up.
Do you have any suggestion, I think it should be the source issues , but I don't know where to check...
Upvotes: 1
Views: 6223
Reputation: 3845
Adding more clarity in case if someone needs, the following should help
from elasticsearch import Elasticsearch
es_mi_prod = Elasticsearch(
["host"],
scheme="https",
port=443,
# ssl_context=context,
http_auth=("user_name", "pass"),
timeout=120,
max_retries=10,
retry_on_timeout=True
)
helpers.bulk(
connector,
generator_function,
chunk_size=500,
request_timeout=120
)
Upvotes: 0
Reputation: 5852
In Elasticsearch 7.x the default timeout is 1m (one minute) here.
The official go client of Elasticsearch go-elasticsearch has a way to set this value.
// WithTimeout - explicit operation timeout.
//
func (f Bulk) WithTimeout(v time.Duration) func(*BulkRequest) {
return func(r *BulkRequest) {
r.Timeout = v
}
}
esutil.BulkIndexerConfig {
// ...
Timeout: timeout,
}
Upvotes: 0
Reputation: 164
Just put the requestTimeout to your config.
e.g :
new elasticsearch.Client({host:"localhost", requestTimeout : Infinity});
You can replace Infinity
by your desired limit in 'ms' .
Upvotes: 3