Reputation: 460
I am pulling data from twitter, filtering, making a generator and trying to bulk index with helper into elasticsearch however i am receiving the following error, which i cant seem to extract where exactly the problem is.
Traceback (most recent call last):
File "/Users/aqm1152/_acert_/basic/test_collection_dump.py", line 245, in <module>
sinceid, complete, api_counter, maxid = search_tweets(qu=query_word, cnt=cnt, sinceid=x , maxitr= 149 , fname=query_word)
File "/Users/aqm1152/_acert_/basic/test_collection_dump.py", line 138, in search_tweets
res = elastic_search.bulk_es(actions=bulk_content, )
File "/Users/aqm1152/_acert_/basic/elasticsearch/acert_basic_elastic_functions.py", line 68, in bulk_es
return helpers.bulk(self.es, actions=actions ,stats_only=True)
File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 194, in bulk
for ok, item in streaming_bulk(client, actions, **kwargs):
File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 134, in _process_bulk_chunk
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
elasticsearch.helpers.BulkIndexError: ('46 document(s) failed to index.', [{'index': {'_index': 'twitter', '_id': '866553007252488192', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866552145507700736', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866479151317962752', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866477250459430913', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866455181839486976', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866411931405570048', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866400265573892096', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866399318957318144', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866395810300403713', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866366506124365824', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866228703478636545', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866206827389865984', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866137742476025856', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866026883284164610', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865968929684029441', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865728096019894273', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865707222453571585', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865675939128029185', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865626970817572865', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865564611591815168', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865553684163211268', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865519159467098113', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865466383684874240', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865362662879895552', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865339244604264449', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865331847710068736', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865251599928700928', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865246748603797505', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865230204293308416', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865229926622011392', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865194609349083136', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865165953612619777', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865165573289902082', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865083343917993984', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865078786655694849', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865078053134905344', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864963278233096192', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864948505143635968', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864929970702962688', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864871369217015809', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864812084521046016', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864742828550836224', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864662384060792832', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864521704248418304', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864511301221068800', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864310734817083392', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}])
It seems that many of the fields, elasticsearch seems to be having problems with ingesting of what seems to be geo location data. Also, I generated about 671 tweets that I pulled but only 454 seems to be indexed when i do a count in elastic search ,additionally I dont have the 46 doc tweets which failed because of the geo data which I cant seem to tell which field exactly it is.
Here is my template that I use for indexing:
{
"template": "twitter",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"tweet": {
"properties": {
"coordinates": {
"type": "geo_point"
},
"created_at": {
"format": "EEE MMM dd HH:mm:ss Z YYYY",
"type": "date"
},
"entities": {
"properties": {
"hashtags": {
"properties": {
"indices": {
"type": "long",
"index": "not_analyzed"
},
"text": {
"type": "text"
}
}
},
"urls": {
"properties": {
"display_url": {
"type": "text",
"index": "not_analyzed"
},
"expanded_url": {
"type": "text",
"index": "not_analyzed"
},
"indices": {
"type": "long",
"index": "not_analyzed"
},
"url": {
"type": "text",
"index": "not_analyzed"
}
}
}
}
},
"symbols": {
"type": "integer",
"index": "not_analyzed"
},
"favorite_count": {
"type": "double",
"index": "not_analyzed"
},
"id": {
"type": "long"
},
"lang": {
"type": "text",
"index": "analyzed"
},
"place": {
"properties": {
"attributes": {
"type": "object"
},
"bounding_box": {
"type": "geo_point"
},
"country": {
"type": "text",
"index": "no"
},
"country_code": {
"type": "text"
},
"full_name": {
"type": "text",
"index": "no"
},
"id": {
"type": "text"
},
"name": {
"type": "text",
"index": "not_analyzed"
},
"place_type": {
"type": "text"
},
"url": {
"type": "text"
}
}
},
"retweet_count": {
"type": "long"
},
"source": {
"type": "text"
},
"text": {
"type": "text"
},
"user": {
"type":"object",
"properties": {
"id": {
"type": "long"
},
"screen_name": {
"type": "text"
}
}
}
}
}
}
}
Here is my code that I am using to create my generator and use helpers.bulk to ingest:
# Global variable
tweet_attributes = ['text','source','retweeted', 'retweet_count','place','lang','favorite_count','entities','id','created_at','user:id','user:screen_name','coordinates']
def _get_necessary_fields(tweets):
doc = defaultdict(dict)
fieldInfo = tweet_attributes
for tweet in tweets:
for fields in fieldInfo :
if (len(fields.split(':'))) == 2 :
keys = fields.split(':')
# nested array at one level
doc[keys[0]][keys[1]] = tweet[keys[0]][keys[1]]
#TODO implement for more than one level, needs better algo
else:
# for each field
if fields in tweet:
doc[fields] = tweet[fields]
yield doc
def _json_for_bulk_body(tweets,el):
# TODO refactor this code when you have time :
# http://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
structured_json_body = ({
"_op_type" : "index",
"_index": el[0], # index name Twitter
"_type": el[1][0], # type is tweet
"_id": doc['id'], # id of the tweet
"_source" :doc} for doc in _get_necessary_fields(tweets))
return structured_json_body
**helpers.bulk(self.es, actions=structured_json_body ,stats_only=True)**
Can someone please explain to me as to why all of the docs besides 46 are not being ingested, and also why the 46 docs are failing to be ingested?
Upvotes: 3
Views: 17610
Reputation: 217424
In your code, you need to check for that null condition and not set the field coordinates
if the case arises
# for each field
if fields in tweet:
if tweet[fields] is not None: <--- add this check
doc[fields] = tweet[fields]
Upvotes: 3