Elasticsearch Aggregation to pandas Dataframe
Struggling with the same problem, I've come to believe the reason for this being that the response_dict are not normal dicts, but an elasticsearch_dsl.utils.AttrList
of elasticsearch_dsl.utils.AttrDict
.
If you have an AttrList
of AttrDicts
, it's possible to do:
resp_dict = response.aggregations.name.bucketsnew_response = [i._d_ for i in resp_dict]
To get a list of normal dicts instead. This will probably play nicer with other libraries.
Edit:
I wrote a recursive function which at least handles some cases, not extensively tested yet though and not wrapped in a nice module or anything. It's just a script. The one_lvl
function keeps track of all the siblings and siblings of parents in the tree in a dictionary called tmp
, and recurses when it finds a new named aggregation. It assumes a lot about the structure of the data, which I'm not sure is warranted in the general case.
The lvl
stuff is necessary I think because you might have duplicate names, so key
exists at several aggregation-levels for instance.
#!/usr/bin/env python3from elasticsearch_dsl.query import QueryStringfrom elasticsearch_dsl import Search, Afrom elasticsearch import Elasticsearchimport pandas as pdPORT = 9250TIMEOUT = 10000USR = "someusr"PW = "somepw"HOST = "test.com"INDEX = "my_index"QUERY = "foobar"client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)qs = QueryString(query = QUERY)s = Search(using=client, index=INDEX).query(qs)s = s.params(size = 0)agg= { "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"), "region" : A("terms", field="region", size=10), "county" : A("terms", field="county", size = 10)}s.aggs.bucket("dates", agg["dates"]). \ bucket("region", agg["region"]). \ bucket("county", agg["county"])resp = s.execute()data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}rec_list = ["buckets"] + [*agg.keys()]def get_fields(i, lvl): return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}def one_lvl(data, tmp, lvl, rows, maxlvl): tmp = {**tmp, **get_fields(data, lvl)} if "buckets" not in data: rows.append(tmp) for d in data: if d in ["buckets"]: for v, b in enumerate(data[d]): tmp = {**tmp, **get_fields(data[d][v], lvl)} for k in b: if k in agg.keys(): one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl) else: if lvl == maxlvl: tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]} rows.append(tmp) return rowsrows = one_lvl(data, {}, 1, [], len(agg))df = pd.DataFrame(rows)