How to Create Dataframe from AWS Athena using Boto3 get_query_results method How to Create Dataframe from AWS Athena using Boto3 get_query_results method pandas pandas

How to Create Dataframe from AWS Athena using Boto3 get_query_results method


get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?

If you try to add:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

You will obtain the next error:

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):

def obtain_data_from_s3(self):    self.resource = boto3.resource('s3',                           region_name = self.region_name,                           aws_access_key_id = self.aws_access_key_id,                          aws_secret_access_key= self.aws_secret_access_key)    response = self.resource \    .Bucket(self.bucket) \    .Object(key= self.folder + self.filename + '.csv') \    .get()    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

The self.filename can be:

self.filename = response['QueryExecutionId'] + ".csv"

Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.

import timeimport boto3import pandas as pdimport ioclass QueryAthena:    def __init__(self, query, database):        self.database = database        self.folder = 'my_folder/'        self.bucket = 'my_bucket'        self.s3_input = 's3://' + self.bucket + '/my_folder_input'        self.s3_output =  's3://' + self.bucket + '/' + self.folder        self.region_name = 'us-east-1'        self.aws_access_key_id = "my_aws_access_key_id"        self.aws_secret_access_key = "my_aws_secret_access_key"        self.query = query    def load_conf(self, q):        try:            self.client = boto3.client('athena',                               region_name = self.region_name,                               aws_access_key_id = self.aws_access_key_id,                              aws_secret_access_key= self.aws_secret_access_key)            response = self.client.start_query_execution(                QueryString = q,                    QueryExecutionContext={                    'Database': self.database                    },                    ResultConfiguration={                    'OutputLocation': self.s3_output,                    }            )            self.filename = response['QueryExecutionId']            print('Execution ID: ' + response['QueryExecutionId'])        except Exception as e:            print(e)        return response                    def run_query(self):        queries = [self.query]        for q in queries:            res = self.load_conf(q)        try:                          query_status = None            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']                print(query_status)                if query_status == 'FAILED' or query_status == 'CANCELLED':                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))                time.sleep(10)            print('Query "{}" finished.'.format(self.query))            df = self.obtain_data()            return df        except Exception as e:            print(e)          def obtain_data(self):        try:            self.resource = boto3.resource('s3',                                   region_name = self.region_name,                                   aws_access_key_id = self.aws_access_key_id,                                  aws_secret_access_key= self.aws_secret_access_key)            response = self.resource \            .Bucket(self.bucket) \            .Object(key= self.folder + self.filename + '.csv') \            .get()            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')           except Exception as e:            print(e)  if __name__ == "__main__":           query = "SELECT * FROM bucket.folder"    qa = QueryAthena(query=query, database='myAthenaDb')    dataframe = qa.run_query()


I have a solution for my first question, using the following function

def results_to_df(results):    columns = [        col['Label']        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']    ]    listed_results = []    for res in results['ResultSet']['Rows'][1:]:         values = []         for field in res['Data']:            try:                values.append(list(field.values())[0])             except:                values.append(list(' '))        listed_results.append(            dict(zip(columns, values))        )    return listed_results

and then:

t = results_to_df(response)pd.DataFrame(t)

As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:

def run_query(query, database, s3_output):    '''     Function for executing Athena queries and return the query ID     '''    client = boto3.client('athena')    response = client.start_query_execution(        QueryString=query,        QueryExecutionContext={            'Database': database            },        ResultConfiguration={            'OutputLocation': s3_output,            }        )    print('Execution ID: ' + response['QueryExecutionId'])    return responsedef format_result(results):    '''    This function format the results toward append in the needed format.    '''    columns = [        col['Label']        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']    ]    formatted_results = []    for result in results['ResultSet']['Rows'][0:]:        values = []        for field in result['Data']:            try:                values.append(list(field.values())[0])             except:                values.append(list(' '))        formatted_results.append(            dict(zip(columns, values))        )    return formatted_resultsres = run_query(query_2, database, s3_ouput) #query Athenaimport sysimport boto3marker = Noneformatted_results = []query_id = res['QueryExecutionId']i = 0start_time = time.time()while True:    paginator = client.get_paginator('get_query_results')    response_iterator = paginator.paginate(         QueryExecutionId=query_id,        PaginationConfig={            'MaxItems': 1000,            'PageSize': 1000,            'StartingToken': marker})    for page in response_iterator:        i = i + 1        format_page = format_result(page)        if i == 1:            formatted_results = pd.DataFrame(format_page)        elif i > 1:            formatted_results = formatted_results.append(pd.DataFrame(format_page))    try:        marker = page['NextToken']    except KeyError:        breakprint ("My program took", time.time() - start_time, "to run")

It's not formatted so good but I think it does the job...


You can use AWS Data Wrangler to create pandas data frame directly querying through Athena.

import awswrangler as wr  df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

You can find more information here