Elasticsearch Aggregation to pandas Dataframe Elasticsearch Aggregation to pandas Dataframe elasticsearch elasticsearch

Elasticsearch Aggregation to pandas Dataframe


Struggling with the same problem, I've come to believe the reason for this being that the response_dict are not normal dicts, but an elasticsearch_dsl.utils.AttrList of elasticsearch_dsl.utils.AttrDict.

If you have an AttrList of AttrDicts, it's possible to do:

resp_dict = response.aggregations.name.bucketsnew_response = [i._d_ for i in resp_dict]

To get a list of normal dicts instead. This will probably play nicer with other libraries.

Edit:

I wrote a recursive function which at least handles some cases, not extensively tested yet though and not wrapped in a nice module or anything. It's just a script. The one_lvl function keeps track of all the siblings and siblings of parents in the tree in a dictionary called tmp, and recurses when it finds a new named aggregation. It assumes a lot about the structure of the data, which I'm not sure is warranted in the general case.

The lvl stuff is necessary I think because you might have duplicate names, so key exists at several aggregation-levels for instance.

#!/usr/bin/env python3from elasticsearch_dsl.query import QueryStringfrom elasticsearch_dsl import Search, Afrom elasticsearch import Elasticsearchimport pandas as pdPORT = 9250TIMEOUT = 10000USR = "someusr"PW = "somepw"HOST = "test.com"INDEX = "my_index"QUERY = "foobar"client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)qs = QueryString(query = QUERY)s = Search(using=client, index=INDEX).query(qs)s = s.params(size = 0)agg= {    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),    "region" : A("terms", field="region", size=10),    "county" : A("terms", field="county", size = 10)}s.aggs.bucket("dates", agg["dates"]). \       bucket("region", agg["region"]). \       bucket("county", agg["county"])resp = s.execute()data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}rec_list = ["buckets"] + [*agg.keys()]def get_fields(i, lvl):    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}def one_lvl(data, tmp, lvl, rows, maxlvl):    tmp = {**tmp, **get_fields(data, lvl)}    if "buckets" not in data:        rows.append(tmp)    for d in data:        if d in ["buckets"]:            for v, b in enumerate(data[d]):                tmp = {**tmp, **get_fields(data[d][v], lvl)}                for k in b:                    if k in agg.keys():                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)                    else:                        if lvl == maxlvl:                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}                            rows.append(tmp)    return rowsrows = one_lvl(data, {}, 1, [], len(agg))df = pd.DataFrame(rows)