Reputation: 615
When i try to send the bulk_data to the local elasticsearch, my data isn't loaded because of the SerializationError.
I already tried to fill the empty cells in the csv file, but that wasn't the solution.
from elasticsearch import Elasticsearch
bulk_data = []
header = []
count = 0
for row in csv_file_object:
if count > 0 :
data_dict = {}
for i in range(len(row)):
row = row.rstrip()
data_dict[header[i]] = row[i]
op_dict = {
"index": {
"_index": INDEX_NAME,
"_type": TYPE_NAME,
}
}
bulk_data.append(op_dict)
bulk_data.append(data_dict)
else:
header = row
count = count+1
# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
print("deleting '%s' index..." % (INDEX_NAME))
res = es.indices.delete(index = INDEX_NAME)
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)
See image for the SerializationError and bulk_data values:
Please note: the \n is added by the serialization process itself.
Upvotes: 0
Views: 533
Reputation: 3222
I try to repond to you but I can't understand one thing. How you retrieve your field name from data? In your code I see that you retrieve it from a list called header
that is empty? I can't understand how you take this value.. Check my answer i don't know if i understand well
from elasticsearch import Elasticsearch
from elasticsearch import helpers
index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
count = 0
def generate_data(csv_file_object)
with open(csv_file_object, "r") as f:
for line in f:
line = line.split(",").rstrip()
data_dict = {header[count]: line}
obj={
'_op_type': 'index',
'_index': index_name,
'_type': doc_type,
'_id': count+1,
'_source': data_dict
}
count +=1
yield obj
for success, info in helpers.parallel_bulk(client=esConnector, actions=generate_data(csv_file_object), thread_count=4):
if not success:
print 'Doc failed', info
Upvotes: 1