Reputation: 784
I am trying to build a DataFrame in pandas, using the results of a very basic query to Elasticsearch. I am getting the Data I need, but its a matter of slicing the results in a way to build the proper data frame. I really only care about getting the timestamp, and path, of each result. I have tried a few different es.search patterns.
Code:
from datetime import datetime
from elasticsearch import Elasticsearch
from pandas import DataFrame, Series
import pandas as pd
import matplotlib.pyplot as plt
es = Elasticsearch(host="192.168.121.252")
res = es.search(index="_all", doc_type='logs', body={"query": {"match_all": {}}}, size=2, fields=('path','@timestamp'))
This gives 4 chunks of data. [u'hits', u'_shards', u'took', u'timed_out']. My results are inside the hits.
res['hits']['hits']
Out[47]:
[{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app2.log'}},
{u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
u'path': u'app1.log'}}]
The only things I care about, are getting the timestamp, and path for each hit.
res['hits']['hits'][0]['fields']
Out[48]:
{u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app1.log'}
I can not for the life of me figure out who to get that result, into a dataframe in pandas. So for the 2 results I have returned, I would expect a dataframe like.
timestamp path
0 2014-08-07T12:36:00.086Z app1.log
1 2014-08-07T12:36:00.200Z app2.log
Upvotes: 36
Views: 36787
Reputation: 1304
Or you could use the json_normalize function of pandas :
from pandas import json_normalize
# from pandas.io.json import json_normalize
df = json_normalize(res['hits']['hits'])
And then filtering the result dataframe by column names
Upvotes: 34
Reputation: 11
redata = map(lambda x:x['_source'], res['hits']['hits'])
pd.DataFrame(redata)
if I just use pandas module, it will be the best solution. in my case, these code cost 20.5ms
if use the pandas.io.json.json_normalize(res['hits']['hits'])
, it will cost 291ms, and the result is different.
Upvotes: 0
Reputation: 659
With elasticsearch_dsl
you can search documents, get them by id, etc.
For instance
from elasticsearch_dsl import Document
# retrieve document whose _id is in the list of ids
s = Document.mget(ids,using=es_connection,index=myindex)
or
from elasticsearch_dsl import Search
# get (up to) 100 documents from a given index
s = Search(using=es_connection,index=myindex).extra(size=100)
then, in case you want to create a DataFrame
and use the elasticsearch ids in your dataframe index, you can do as follows:
df = pd.DataFrame([{'id':r.meta.id, **r.to_dict()}
for r
in s.execute()]).set_index('id',drop=True)
Upvotes: 2
Reputation: 6037
If your request is likely to return more than 10,000 documents from Elasticsearch, you will need to use the scrolling function of Elasticsearch. Documentation and examples for this function are rather difficult to find, so I will provide you with a full, working example:
import pandas as pd
from elasticsearch import Elasticsearch
import elasticsearch.helpers
es = Elasticsearch('http://localhost:9200')
body={"query": {"match_all": {}}}
results = elasticsearch.helpers.scan(es, query=body, index="my_index")
df = pd.DataFrame.from_dict([document['_source'] for document in results])
Simply edit the fields that start with "my_" to correspond to your own values
Upvotes: 6
Reputation: 91
I tested all the answers for performance and I found that the pandasticsearch
approach is the fastest by a large margin:
tests:
test1 (using from_dict)
%timeit -r 2 -n 5 teste1(resp)
10.5 s ± 247 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
test2 (using a list)
%timeit -r 2 -n 5 teste2(resp)
2.05 s ± 8.17 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
test3 (using import pandasticsearch as pdes)
%timeit -r 2 -n 5 teste3(resp)
39.2 ms ± 5.89 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
test4 (using from pandas.io.json import json_normalize)
%timeit -r 2 -n 5 teste4(resp)
387 ms ± 19 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
I hope its can be usefull for anyone
CODE:
index = 'teste_85'
size = 10000
fields = True
sort = ['col1','desc']
query = 'teste'
range_gte = '2016-01-01'
range_lte = 'now'
resp = esc.search(index = index,
size = size,
scroll = '2m',
_source = fields,
doc_type = '_doc',
body = {
"sort" : { "{0}".format(sort[0]) : {"order" : "{0}".format(sort[1])}},
"query": {
"bool": {
"must": [
{ "query_string": { "query": "{0}".format(query) } },
{ "range": { "anomes": { "gte": "{0}".format(range_gte), "lte": "{0}".format(range_lte) } } },
]
}
}
})
def teste1(resp):
df = pd.DataFrame(columns=list(resp['hits']['hits'][0]['_source'].keys()))
for hit in resp['hits']['hits']:
df = df.append(df.from_dict(hit['_source'], orient='index').T)
return df
def teste2(resp):
col=list(resp['hits']['hits'][0]['_source'].keys())
for hit in resp['hits']['hits']:
df = pd.DataFrame(list(hit['_source'].values()), col).T
return df
def teste3(resp):
df = pdes.Select.from_dict(resp).to_pandas()
return df
def teste4(resp):
df = json_normalize(resp['hits']['hits'])
return df
Upvotes: 7
Reputation: 1340
For anyone that encounters this question as well.. @CT Zhu has a nice answer, but I think it is a bit outdated. but when you are using the elasticsearch_dsl package. The result is a bit different. Try this in that case:
# Obtain the results..
res = es_dsl.Search(using=con, index='_all')
res_content = res[0:100].execute()
# convert it to a list of dicts, by using the .to_dict() function
res_filtered = [x['_source'].to_dict() for x in res_content['hits']['hits']]
# Pass this on to the 'from_dict' function
A = pd.DataFrame.from_dict(res_filtered)
Upvotes: 3
Reputation: 6037
Better yet, you can use the fantastic pandasticsearch
library:
from elasticsearch import Elasticsearch
es = Elasticsearch('http://localhost:9200')
result_dict = es.search(index="recruit", body={"query": {"match_all": {}}})
from pandasticsearch import Select
pandas_df = Select.from_dict(result_dict).to_pandas()
Upvotes: 17
Reputation: 323
Here's a bit of code you might find useful for your work. It's simple, and extendible, but has been saving me a lot of time when faced with just "grabbing" some data from ElasticSearch to analyze.
If you just want to grab all the data of a given index and doc_type of your localhost you can do:
df = ElasticCom(index='index', doc_type='doc_type').search_and_export_to_df()
You can use any of the arguments you'd usually use in elasticsearch.search(), or specify a different host. You can also choose whether to include the _id or not, and specify whether the data is in '_source' or 'fields' (it tries to guess). It also tries to convert the field values by default (but you can switch that off).
Here's the code:
from elasticsearch import Elasticsearch
import pandas as pd
class ElasticCom(object):
def __init__(self, index, doc_type, hosts='localhost:9200', **kwargs):
self.index = index
self.doc_type = doc_type
self.es = Elasticsearch(hosts=hosts, **kwargs)
def search_and_export_to_dict(self, *args, **kwargs):
_id = kwargs.pop('_id', True)
data_key = kwargs.pop('data_key', kwargs.get('fields')) or '_source'
kwargs = dict({'index': self.index, 'doc_type': self.doc_type}, **kwargs)
if kwargs.get('size', None) is None:
kwargs['size'] = 1
t = self.es.search(*args, **kwargs)
kwargs['size'] = t['hits']['total']
return get_search_hits(self.es.search(*args, **kwargs), _id=_id, data_key=data_key)
def search_and_export_to_df(self, *args, **kwargs):
convert_numeric = kwargs.pop('convert_numeric', True)
convert_dates = kwargs.pop('convert_dates', 'coerce')
df = pd.DataFrame(self.search_and_export_to_dict(*args, **kwargs))
if convert_numeric:
df = df.convert_objects(convert_numeric=convert_numeric, copy=True)
if convert_dates:
df = df.convert_objects(convert_dates=convert_dates, copy=True)
return df
def get_search_hits(es_response, _id=True, data_key=None):
response_hits = es_response['hits']['hits']
if len(response_hits) > 0:
if data_key is None:
for hit in response_hits:
if '_source' in hit.keys():
data_key = '_source'
break
elif 'fields' in hit.keys():
data_key = 'fields'
break
if data_key is None:
raise ValueError("Neither _source nor fields were in response hits")
if _id is False:
return [x.get(data_key, None) for x in response_hits]
else:
return [dict(_id=x['_id'], **x.get(data_key, {})) for x in response_hits]
else:
return []
Upvotes: 1
Reputation: 54340
There is a nice toy called pd.DataFrame.from_dict
that you can use in situation like this:
In [34]:
Data = [{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
u'path': u'app2.log'}},
{u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
u'_index': u'logstash-2014.08.07',
u'_score': 1.0,
u'_type': u'logs',
u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
u'path': u'app1.log'}}]
In [35]:
df = pd.concat(map(pd.DataFrame.from_dict, Data), axis=1)['fields'].T
In [36]:
print df.reset_index(drop=True)
@timestamp path
0 2014-08-07T12:36:00.086Z app2.log
1 2014-08-07T12:36:00.200Z app1.log
Show it in four steps:
1, Read each item in the list (which is a dictionary
) into a DataFrame
2, We can put all the items in the list into a big DataFrame
by concat
them row-wise, since we will do step#1 for each item, we can use map
to do it.
3, Then we access the columns labeled with 'fields'
4, We probably want to rotate the DataFrame
90 degrees (transpose) and reset_index
if we want the index to be the default int
sequence.
Upvotes: 16