JP..t
JP..t

Reputation: 615

CSV to elasticsearch with python SerializationError

When i try to send the bulk_data to the local elasticsearch, my data isn't loaded because of the SerializationError.

I already tried to fill the empty cells in the csv file, but that wasn't the solution.

from elasticsearch import Elasticsearch

bulk_data = []
header = []
count = 0
for row in csv_file_object:
    if count > 0 : 
        data_dict = {}
        for i in range(len(row)):
            row = row.rstrip() 
            data_dict[header[i]] = row[i]
        op_dict = {
            "index": {
                "_index": INDEX_NAME, 
                "_type": TYPE_NAME, 
            }
        }
        bulk_data.append(op_dict)
        bulk_data.append(data_dict)
    else:
        header = row
    count = count+1

# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
    print("deleting '%s' index..." % (INDEX_NAME))
    res = es.indices.delete(index = INDEX_NAME)
    
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)

See image for the SerializationError and bulk_data values:

enter image description here

Please note: the \n is added by the serialization process itself.

Upvotes: 0

Views: 533

Answers (1)

Lupanoide
Lupanoide

Reputation: 3222

I try to repond to you but I can't understand one thing. How you retrieve your field name from data? In your code I see that you retrieve it from a list called header that is empty? I can't understand how you take this value.. Check my answer i don't know if i understand well

from elasticsearch import Elasticsearch
from elasticsearch import helpers


index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
count = 0

def generate_data(csv_file_object)
    with open(csv_file_object, "r") as f:
        for line in f:
           line = line.split(",").rstrip()
           data_dict = {header[count]: line} 
           obj={
            '_op_type': 'index',
            '_index': index_name,
            '_type': doc_type,
            '_id': count+1,
            '_source': data_dict
                }
            count +=1
            yield obj


for success, info in helpers.parallel_bulk(client=esConnector, actions=generate_data(csv_file_object), thread_count=4):
    if not success: 
        print 'Doc failed', info

Upvotes: 1

Related Questions