Nin4ikP
Nin4ikP

Reputation: 135

How to find equal values in different fields in Elasticsearch via Python Query?

I've got values in Elasticsearch (+Kibana) and want to make a Graph, where certain nodes are connected.

My fields are "prev" and "curr" and indicate the "previous" and the "current" page, which the user visited.

E.g:

So what I'm trying to do is searching for values, where current is equal to previous, to be able to connect those and visualize via Networkx-Graph in Kibana.

My problem is that I just started yesterday with query-syntax and don't know if this is even possible. All in all, my goal is to make a graph, where nodes are connected to a chain, e.g:

Meaning that somebody visited those pages in a certain order.

What I've tried for now is:

def getPrevList():
    previous = []
    previousQuery = {
        "size": 0,
        "aggs": {
            "topTerms": {
                "terms": {
                    "field": "prev",
                    "size": 50000
                }
            }
        }
    }
    results = es.search(index="wiki", body=previousQuery)["aggregations"]["topTerms"]["buckets"]
    for bucket in results:
        previous.append({
            "prev" : bucket["key"],
            "numDocs" : bucket["doc_count"]
        })
    return previous

prevs=getPrevList()

rowNum = 0;
totalNumReviews=0

for prevDetails in prevs:
    rowNum += 1
    totalNumDocs += prevDetails["numDocs"]
    prevId = prevDetails["prev"]

    q = {
        "query": {
            "bool": {
                "must": [
                    {
                        "term": {"prev": prevId}
                    }
                ]
            }
        },
        "controls": {
            "sample_size": 10000,
            "use_significance": True
        },
        "vertices": [
            {
                "field": "curr",
                "size": VERTEX_SIZE,
                "min_doc_count": 1
            },
            {
                "field": "prev",
                "size": VERTEX_SIZE,
                "min_doc_count": 1
            }
        ],
        "connections": {
            "query": {
                "match_all": {}
            }
        }
    }

At the end, I'm doing the following:

results = es.transport.perform_request('POST', "/wiki/_xpack/_graph/_explore", body=q)  

# Use NetworkX to create a graph of prevs and currs we can analyze
G = nx.Graph()

for node in results["vertices"]:
    G.add_node(nodeId(node), type=node["field"])

for edge in results["connections"]:
    n1 = results["vertices"][int(edge["source"])]
    n2 = results["vertices"][int(edge["target"])]
    G.add_edge(nodeId(n1), nodeId(n2))

I copied it from another example, which worked well, but I can see that the "connections" are important to be able to connect the vertices.

As far as I understood, I need the query to find the correct "prev" field. The controls are not significant for now. And here comes the complex part for me: What am I writing in the vertices and connections part? Is it correct that I defined vertices as the prev and curr fields? And in the connections-query: for now I defined "match_all", but this is obviously not correct. I need a query, where I can "match" those, where prev equals curr and connect them.. but HOW??

Any hint is appreciated! Thank you in forward.

EDIT:

Like @Lupanoide suggested, I altered the code and have now two visualizations: the first one is the first suggested solution and it gives me this graph (part of it) (matplotlib, not Kibana yet):

dict2Graph

The second solution looks more crazy and is more likely to be the correct one, but I need to visualize it in Kibana first:

two for-loops

So the new end of my script is now:

gq = json.dumps(q)

workspaceID ="/f44c95c0-223d-11e9-b49e-bb0f8e1e7bae" # my v6.4.0 workspace

workspaceUrl = "graph#/workspace/"+workspaceID+"?query=" + urllib.quote_plus(gq)        
doc = {
    "url": workspaceUrl
}
res = es.index(index=connectionsIndexName, doc_type='task', id=0, body=doc)

My only problem now is that when I'm using Kibana to open the URL, I do not see the graph. Instead I get the "new Graph" page.

EDIT2 Okay, I send the query, but of course the query alone is not enough. I need to pass the graph and its connections, right? Is it possible?

Thank you very much!

Upvotes: 3

Views: 1026

Answers (1)

Lupanoide
Lupanoide

Reputation: 3222

EDIT: For your use case you need find all the values for field curr with the same prev value. So you need to groupBy all the pages that are clicked after a certain page. You can do that with terms aggregation . You need to build a query that on one hand returns, with a term aggregation, all the values for the prev field and then you aggregate against over all the curr values generated:

def getOccurrencyDict():

  body = {
  "size": 0,
  "aggs": {
    "getAllThePrevs": {
      "terms": {
        "field": "prev",
        "size": 40000
      },
      "aggs": {
        "getAllTheCurr": {
          "terms": {
            "field": "curr",
            "size": 40000
          }
        }
      }
    }
  }
}
result = es.search(index="my_index", doc_type="mydoctype", body=body)

Then you have to build a data structure that the class Graph() of Networkx library accepts. So you should build a dict of list and then pass that var to the fromdictoflist method:

dict2Graph = dict()
for res in result["aggregations"]["getAllThePrevs"]["buckets"]:
    dict2Graph[ res["key"] ] = list() #you create a dict of list with a prev value key
    dict2Graph[ res["key"] ].append(res["getAllTheCurr"]["buckets"]) # you append a list of dict composed by key `key` with the `curr` value, and key `doc_count` with the number of occurrence of the term `curr` before the term prev

Now you pass it to the networkx ingestion method:

G=nx.from_dict_of_lists(dict2Graph)

I have not tested the networkx ingestion, so if it doesn't work , it is because we passed a dict of list of dict inside it and not a dict of list, so you should change a little bit how you build your dict2Graph dict

If the aggregation of aggregation query is too slow you should use prtition. Please read here how you could reach partition aggregation in elastic

EDIT:

after a reading of the networkX documentation, you could do also in this way, without creating the intermediate data structure:

from elasticsearch import Elasticsearch
from elasticsearch.client.graph import GraphClient

es = Elasticsearch()
graph_client = GraphClient(es) 

def createGraphInKibana(prev):
    q = {
    "query": {
        "bool": {
            "must": [
                {
                    "term": {"prev": prev}
                }
            ]
        }
    },
    "controls": {
        "sample_size": 10000,
        "use_significance": True
    },
    "vertices": [
        {
            "field": "curr",
            "size": VERTEX_SIZE,
            "min_doc_count": 1
        },
        {
            "field": "prev",
            "size": VERTEX_SIZE,
            "min_doc_count": 1
        }
    ],
    "connections": {
        "query": {
            "match_all": {}
          }
       }
    }
    graph_client.explore(index="your_index", doc_type="your_doc_type", body=q)





G = nx.Graph()
for prev in result["aggregations"]["getAllThePrevs"]["buckets"]:
    createGraphInKibana(prev['key'])
    for curr in prev["getAllTheCurr"]["buckets"]:
        G.add_edge(prev["key"], curr["key"], weight=curr["doc_count"])

Upvotes: 2

Related Questions